In [0]:
import json
import pandas as pd

#Lê os arquivos gerados pelo notebook que coletou os arquivos da API
rockets_source = "/Volumes/workspace/spacex/spacex/rockets.json"
launches_source = "/Volumes/workspace/spacex/spacex/launches.json"

#Transforma os JSON en DataFrames
rockets_df = spark.read.option("multiline",True).json(rockets_source)
launches_df = spark.read.option("multiline",True).json(launches_source)

#Mostra os DataFrames
display(rockets_df)
display(launches_df)

#Salva os DataFrames na camada Bronze
rockets_df.write.mode("overwrite").parquet("/Volumes/workspace/spacex/spacex/Bronze/rockets.parquet")
launches_df.write.mode("overwrite").parquet("/Volumes/workspace/spacex/spacex/Bronze/launches.parquet")

In [0]:
from pyspark.sql.functions import col

#Lê os arquivos da camada bronze
df_rockets_read = spark.read.parquet("/Volumes/workspace/spacex/spacex/Bronze/rockets.parquet/")
df_launches_read = spark.read.parquet("/Volumes/workspace/spacex/spacex/Bronze/launches.parquet/")

#Transforma os Parquets em Dataframe selecionando as colunas relevantes
df_rockets = df_rockets_read.select(
    col("id").alias("rocket_id"),
    col("name").alias("rocket_name"),
    col("boosters").alias("rocket_boosters"),
    col("country").alias("country"),
    col("company").alias("company"),
    col("cost_per_launch").alias("rocket_cost_per_launch"),
    col("success_rate_pct").alias("rocket_success_rate_pct"),
    col("first_flight").alias("rocket_first_flight"),
    col("type").alias("rocket_type")
)

df_launches = df_launches_read.select(
    col("id").alias("launche_id"),
    col("name").alias("launche_name"),
    col("date_utc").alias("launche_date_utc"),
    col("success").alias("launche_success"),
    col("rocket").alias("launche_rocket_id"),
    col("flight_number").alias("launche_flight_number")
)

#Mostra os DataFrames
display(df_rockets)
display(df_launches)

#Faz o Join entre as tabelas transformando em um só arquivo
df_rockets_launches = df_launches.join(
    df_rockets,
    df_launches.launche_rocket_id == df_rockets.rocket_id,
    "inner"
)

#Mostra o DataFrame
display(df_rockets_launches)

#Salva o DataFrame na camada Silver
df_rockets_launches.write.mode("overwrite").parquet("/Volumes/workspace/spacex/spacex/Silver/tb_rockets_launches.parquet")

In [0]:
from pyspark.sql.functions import col, to_date

#Lê os arquivos da camada Silver
df_rocket_launches_read = spark.read.parquet(
    "/Volumes/workspace/spacex/spacex/Silver/tb_rockets_launches.parquet/"
)

#Realiza os tratamentos pertinentes na tabela
tb_rockets_launches_gold = df_rocket_launches_read.select(
    col("launche_id"),
    col("launche_name"),
    to_date(col("launche_date_utc")).alias("launche_date_utc_date"),
    col("launche_success").cast("boolean").alias("launche_success_bol"),
    col("launche_rocket_id"),
    col("launche_flight_number"),
    col("rocket_id"),
    col("rocket_name"),
    col("rocket_boosters"),
    col("country"),
    col("company"),
    col("rocket_cost_per_launch").cast("double").alias("rocket_cost_per_launch"),
    (col("rocket_success_rate_pct").cast("double") / 100).alias("rocket_success_rate_pct"),
    to_date(col("rocket_first_flight")).alias("rocket_first_flight_date"),
    col("rocket_type")
)

#Mostra os dados
display(tb_rockets_launches_gold)

#Salva os dados na camada Gold
tb_rockets_launches_gold.write.mode("overwrite").parquet("/Volumes/workspace/spacex/spacex/Gold/tb_rockets_launches_gold.parquet")

