In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import lit, when, col
from pyspark.sql import SparkSession


In [0]:
%run "/Users/karolinzajac@student.agh.edu.pl/Mini-Kurs-ETL-Spark/3. Python/3. Pobierz Dane"

In [0]:
display(dbutils.fs.ls("dbfs:/FileStore/tables/Files/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/Files/actors.csv,actors.csv,41451610,1742494670000
dbfs:/FileStore/tables/Files/movies.csv,movies.csv,50657522,1742494677000
dbfs:/FileStore/tables/Files/movies.json/,movies.json/,0,0
dbfs:/FileStore/tables/Files/movies_corrupted.csv/,movies_corrupted.csv/,0,0
dbfs:/FileStore/tables/Files/names.csv,names.csv,62194757,1742494685000
dbfs:/FileStore/tables/Files/ratings.csv,ratings.csv,17363252,1742494695000


In [0]:
schema = StructType([
    StructField("id", StringType(), False),
    StructField("title", StringType(), False),
    StructField("original_title", StringType(), False),
    StructField("release_year", IntegerType(), False)
])

In [0]:
filePath = "dbfs:/FileStore/tables/Files/movies.csv"
df = spark.read.format("csv") \
            .option("header","true") \
            .schema(schema) \
            .load(filePath)
display(df)

id,title,original_title,release_year
tt0000009,Miss Jerry,Miss Jerry,1894
tt0000574,The Story of the Kelly Gang,The Story of the Kelly Gang,1906
tt0001892,Den sorte drøm,Den sorte drøm,1911
tt0002101,Cleopatra,Cleopatra,1912
tt0002130,L'Inferno,L'Inferno,1911
tt0002199,"From the Manger to the Cross or, Jesus of Nazareth","From the Manger to the Cross or, Jesus of Nazareth",1912
tt0002423,Madame DuBarry,Madame DuBarry,1919
tt0002445,Quo Vadis?,Quo Vadis?,1913
tt0002452,Independenta Romaniei,Independenta Romaniei,1912
tt0002461,Richard III,Richard III,1912


In [0]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- release_year: integer (nullable = true)



###Zadanie 3

In [0]:
df_corrupted = df.withColumn("release_year", 
    when(df["release_year"] == "1912", "N/A") 
    .when(df["release_year"] == "1913", "??") 
)
df_corrupted.show()
corruptedPath = "dbfs:/FileStore/tables/Files/movies_corrupted.csv"

df_corrupted.write.mode("overwrite") \
    .option("header", "true") \
    .csv(corruptedPath)

+---------+--------------------+--------------------+------------+
|       id|               title|      original_title|release_year|
+---------+--------------------+--------------------+------------+
|tt0000009|          Miss Jerry|          Miss Jerry|        null|
|tt0000574|The Story of the ...|The Story of the ...|        null|
|tt0001892|      Den sorte drøm|      Den sorte drøm|        null|
|tt0002101|           Cleopatra|           Cleopatra|         N/A|
|tt0002130|           L'Inferno|           L'Inferno|        null|
|tt0002199|From the Manger t...|From the Manger t...|         N/A|
|tt0002423|      Madame DuBarry|      Madame DuBarry|        null|
|tt0002445|          Quo Vadis?|          Quo Vadis?|          ??|
|tt0002452|Independenta Roma...|Independenta Roma...|         N/A|
|tt0002461|         Richard III|         Richard III|         N/A|
|tt0002646|            Atlantis|            Atlantis|          ??|
|tt0002844|Fantômas - À l'om...|Fantômas - À l'om...|         

In [0]:
fileswithschema= spark.read.format("csv") \
            .option("mode","PERMISSIVE") \
            .schema(schema) \
            .load(corruptedPath)
display(fileswithschema)

id,title,original_title,release_year
id,title,original_title,
tt0000009,Miss Jerry,Miss Jerry,
tt0000574,The Story of the Kelly Gang,The Story of the Kelly Gang,
tt0001892,Den sorte drøm,Den sorte drøm,
tt0002101,Cleopatra,Cleopatra,
tt0002130,L'Inferno,L'Inferno,
tt0002199,"From the Manger to the Cross or, Jesus of Nazareth","From the Manger to the Cross or, Jesus of Nazareth",
tt0002423,Madame DuBarry,Madame DuBarry,
tt0002445,Quo Vadis?,Quo Vadis?,
tt0002452,Independenta Romaniei,Independenta Romaniei,


In [0]:
fileswithschema= spark.read.format("csv") \
            .option("mode","DROPMALFORMED") \
            .schema(schema) \
            .load(corruptedPath)
display(fileswithschema)

id,title,original_title,release_year
tt0000009,Miss Jerry,Miss Jerry,
tt0000574,The Story of the Kelly Gang,The Story of the Kelly Gang,
tt0001892,Den sorte drøm,Den sorte drøm,
tt0002130,L'Inferno,L'Inferno,
tt0002423,Madame DuBarry,Madame DuBarry,
tt0003102,Ma l'amor mio non muore...,Ma l'amor mio non muore...,
tt0003131,Maudite soit la guerre,Maudite soit la guerre,
tt0003167,Amore di madre,"Home, Sweet Home",
tt0003637,Assunta Spina,Assunta Spina,
tt0003643,The Avenging Conscience: or 'Thou Shalt Not Kill',The Avenging Conscience: or 'Thou Shalt Not Kill',


In [0]:
fileswithschema= spark.read.format("csv") \
            .option("mode","FAILFAST") \
            .schema(schema) \
            .load(corruptedPath)
display(fileswithschema)

###Zadanie 4


In [0]:
jsonPath = "dbfs:/FileStore/tables/Files/movies.json"
df.write.mode("overwrite").json(jsonPath)


In [0]:
df_json = spark.read.json(jsonPath)
df_json.show()

+---------+--------------------+------------+--------------------+
|       id|      original_title|release_year|               title|
+---------+--------------------+------------+--------------------+
|tt0000009|          Miss Jerry|        1894|          Miss Jerry|
|tt0000574|The Story of the ...|        1906|The Story of the ...|
|tt0001892|      Den sorte drøm|        1911|      Den sorte drøm|
|tt0002101|           Cleopatra|        1912|           Cleopatra|
|tt0002130|           L'Inferno|        1911|           L'Inferno|
|tt0002199|From the Manger t...|        1912|From the Manger t...|
|tt0002423|      Madame DuBarry|        1919|      Madame DuBarry|
|tt0002445|          Quo Vadis?|        1913|          Quo Vadis?|
|tt0002452|Independenta Roma...|        1912|Independenta Roma...|
|tt0002461|         Richard III|        1912|         Richard III|
|tt0002646|            Atlantis|        1913|            Atlantis|
|tt0002844|Fantômas - À l'om...|        1913|Fantômas - À l'om