Transformation

In [0]:
# Reading the table name and stores data in the dataframe.
table_name = 'default.data'
df = spark.table('default.data')

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split

#spark = SparkSession.builder.appName("IMDbDataTransformation").getOrCreate()

df_transformed = df.withColumn("knownForTitlesArray", split(df["knownForTitles"], ","))


In [0]:
df_transformed = df_transformed.withColumn("primaryProfessionArray", split(df["primaryProfession"], ","))


In [0]:
df_transformed.show(truncate=False)

+---------+-------------------+---------+---------+-------------------------------------+---------------------------------------+--------------------------------------------+-----------------------------------------+
|nconst   |primaryName        |birthYear|deathYear|primaryProfession                    |knownForTitles                         |knownForTitlesArray                         |primaryProfessionArray                   |
+---------+-------------------+---------+---------+-------------------------------------+---------------------------------------+--------------------------------------------+-----------------------------------------+
|nm0000001|Fred Astaire       |1899     |1987     |soundtrack,actor,miscellaneous       |tt0045537,tt0072308,tt0053137,tt0050419|[tt0045537, tt0072308, tt0053137, tt0050419]|[soundtrack, actor, miscellaneous]       |
|nm0000002|Lauren Bacall      |1924     |2014     |actress,soundtrack                   |tt0037382,tt0117057,tt0075213,tt0038355|[tt

In [0]:
df_transformed.createOrReplaceTempView("df_transformed_table")

In [0]:
%sql

SELECT * FROM df_transformed_table;

nconst,primaryName,birthYear,deathYear,primaryProfession,knownForTitles,knownForTitlesArray,primaryProfessionArray
nm0000001,Fred Astaire,1899,1987,"soundtrack,actor,miscellaneous","tt0045537,tt0072308,tt0053137,tt0050419","List(tt0045537, tt0072308, tt0053137, tt0050419)","List(soundtrack, actor, miscellaneous)"
nm0000002,Lauren Bacall,1924,2014,"actress,soundtrack","tt0037382,tt0117057,tt0075213,tt0038355","List(tt0037382, tt0117057, tt0075213, tt0038355)","List(actress, soundtrack)"
nm0000003,Brigitte Bardot,1934,\N,"actress,soundtrack,music_department","tt0056404,tt0057345,tt0049189,tt0054452","List(tt0056404, tt0057345, tt0049189, tt0054452)","List(actress, soundtrack, music_department)"
nm0000004,John Belushi,1949,1982,"actor,soundtrack,writer","tt0072562,tt0080455,tt0077975,tt0078723","List(tt0072562, tt0080455, tt0077975, tt0078723)","List(actor, soundtrack, writer)"
nm0000005,Ingmar Bergman,1918,2007,"writer,director,actor","tt0050986,tt0083922,tt0050976,tt0069467","List(tt0050986, tt0083922, tt0050976, tt0069467)","List(writer, director, actor)"
nm0000006,Ingrid Bergman,1915,1982,"actress,soundtrack,producer","tt0038787,tt0034583,tt0036855,tt0038109","List(tt0038787, tt0034583, tt0036855, tt0038109)","List(actress, soundtrack, producer)"
nm0000007,Humphrey Bogart,1899,1957,"actor,soundtrack,producer","tt0034583,tt0042593,tt0043265,tt0037382","List(tt0034583, tt0042593, tt0043265, tt0037382)","List(actor, soundtrack, producer)"
nm0000008,Marlon Brando,1924,2004,"actor,soundtrack,director","tt0078788,tt0068646,tt0047296,tt0070849","List(tt0078788, tt0068646, tt0047296, tt0070849)","List(actor, soundtrack, director)"
nm0000009,Richard Burton,1925,1984,"actor,soundtrack,producer","tt0057877,tt0059749,tt0061184,tt0087803","List(tt0057877, tt0059749, tt0061184, tt0087803)","List(actor, soundtrack, producer)"
nm0000010,James Cagney,1899,1986,"actor,soundtrack,director","tt0042041,tt0029870,tt0031867,tt0035575","List(tt0042041, tt0029870, tt0031867, tt0035575)","List(actor, soundtrack, director)"


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType


expected_schema = StructType([
    StructField("nconst", StringType(), True),
    StructField("primaryName", StringType(), True),
    StructField("birthYear", IntegerType(), True),
    StructField("deathYear", IntegerType(), True),
    StructField("primaryProfession", StringType(), True),
    StructField("knownForTitles", StringType(), True),
])

def validate_and_fix_schema(dataframe, expected_schema):
    actual_schema = dataframe.schema

    if actual_schema != expected_schema:
        # Fix the schema by casting columns to the correct data types
        for field in expected_schema.fields:
            column_name = field.name
            if column_name in actual_schema.names:
                # If the column exists, cast it to the expected data type
                dataframe = dataframe.withColumn(
                    column_name,
                    dataframe[column_name].cast(field.dataType)
                )
            else:
                # If the column is missing, add it with null values
                dataframe = dataframe.withColumn(column_name, lit(None).cast(field.dataType))

    return dataframe

# Apply schema validation and fixing
df1 = validate_and_fix_schema(df_transformed, expected_schema)

# Show the fixed DataFrame
df1.show()




+---------+-------------------+---------+---------+--------------------+--------------------+--------------------+----------------------+
|   nconst|        primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles| knownForTitlesArray|primaryProfessionArray|
+---------+-------------------+---------+---------+--------------------+--------------------+--------------------+----------------------+
|nm0000001|       Fred Astaire|     1899|     1987|soundtrack,actor,...|tt0045537,tt00723...|[tt0045537, tt007...|  [soundtrack, acto...|
|nm0000002|      Lauren Bacall|     1924|     2014|  actress,soundtrack|tt0037382,tt01170...|[tt0037382, tt011...|  [actress, soundtr...|
|nm0000003|    Brigitte Bardot|     1934|     NULL|actress,soundtrac...|tt0056404,tt00573...|[tt0056404, tt005...|  [actress, soundtr...|
|nm0000004|       John Belushi|     1949|     1982|actor,soundtrack,...|tt0072562,tt00804...|[tt0072562, tt008...|  [actor, soundtrac...|
|nm0000005|     Ingmar Bergman|   

In [0]:
df1.schema

StructType([StructField('nconst', StringType(), True), StructField('primaryName', StringType(), True), StructField('birthYear', IntegerType(), True), StructField('deathYear', IntegerType(), True), StructField('primaryProfession', StringType(), True), StructField('knownForTitles', StringType(), True), StructField('knownForTitlesArray', ArrayType(StringType(), False), True), StructField('primaryProfessionArray', ArrayType(StringType(), False), True)])

In [0]:

df1.write.saveAsTable("default.df_table", mode="overwrite")