### Ingest name_basics file

In [0]:
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

##### Step 1 - Read .tsv file

In [0]:
name_basics_raw_df = spark.table("imdb_dev.bronze.name_basics")

In [0]:
display(name_basics_raw_df)

##### Step 2 - Fix to correct value

In [0]:
from pyspark.sql.functions import col, lit, split, when

In [0]:
# Tranform to array the primaryProfession column
# Tranform to array the knownForTitles column
name_basics_type_df = name_basics_raw_df \
                       .withColumn("primaryProfession",split_to_array("primaryProfession")) \
                       .withColumn("knownForTitles",split_to_array("knownForTitles"))

##### Step 3 - Rename Columns

In [0]:
name_basics_renamed_df = name_basics_type_df \
                        .withColumnRenamed("primaryName", "primary_name") \
                        .withColumnRenamed("birthYear", "birth_year") \
                        .withColumnRenamed("deathYear", "death_year") \
                        .withColumnRenamed("primaryProfession", "primary_profession") \
                        .withColumnRenamed("knownForTitles", "known_for_titles") \
                        .withColumn("data_source", lit(v_data_source))

##### Step 4 - Add ingestion date

In [0]:
name_basics_final_df = add_ingestion_date(name_basics_renamed_df)

In [0]:
display(name_basics_final_df)

##### Step 5 - Write data to datalake as delta table

In [0]:
(
    name_basics_final_df.write
      .format("delta")
      .mode("overwrite")
      .option("overwriteSchema", "true")                                     
      .saveAsTable("imdb_dev.silver.name_basics")
)