In [90]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [91]:
spark = SparkSession.builder.appName('bonus-exercises').getOrCreate()

1. Read movies csv as a py-spark dataframe

In [92]:
df = spark.read.csv('csv/movies.csv', header=True, inferSchema=True)

In [93]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- year: string (nullable = true)
 |-- rank: string (nullable = true)



In [94]:
df.sample(0.01).show(10)

+----+--------------------+----+----+
|  id|                name|year|rank|
+----+--------------------+----+----+
|  70|       'Brennus', Le|1897|NULL|
| 460|  ...und Du bist die|1992|NULL|
| 477|...und sowas nenn...|1971|NULL|
| 838|1001 Posies do Am...|1979|NULL|
|1007| 120, rue de la Gare|1946|NULL|
|1087|    13th Letter, The|1951| 5.8|
|1269|     18 and Nasty 34|2003|NULL|
|1313|18/68: Venecia ka...|1968|NULL|
|1327|                1818|1997|NULL|
|1449|1995 MTV Movie Aw...|1995|NULL|
+----+--------------------+----+----+
only showing top 10 rows



In [95]:
# fix data types
df = df.withColumn("year", col("year").cast("int"))
df = df.withColumn("rank", col("rank").cast("double"))

In [96]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- rank: double (nullable = true)



In [97]:
df.count()

388269

2. Groupby dataframe by year to show number of movies per year

In [98]:
df_grouped = df.groupBy('year').count()

In [99]:
df_grouped.show(10)

+----+-----+
|year|count|
+----+-----+
|1959| 2839|
|1990| 5949|
|1896|  410|
|1903|  831|
|1975| 4269|
|1977| 3905|
|1924| 1847|
|2003|11606|
|2007|    7|
|1892|    9|
+----+-----+
only showing top 10 rows



3. Short the grouped dataframe on year asc and desc. Do you see anything suspicious?  
Dataframe probably contains malformed/corrupted data. Py-spark provides you appropriate options to handle similar situations

In [100]:
df_grouped.sort(col('year').asc()).show(5)

+----+-----+
|year|count|
+----+-----+
|NULL| 7210|
|   0|    1|
|   2|    4|
|1888|    2|
|1890|    3|
+----+-----+
only showing top 5 rows



In [101]:
df_grouped.sort(col('year').desc()).show(5)

+----+-----+
|year|count|
+----+-----+
|2008|    1|
|2007|    7|
|2006|  194|
|2005| 1433|
|2004| 8521|
+----+-----+
only showing top 5 rows



The dataset does not intend to have the year field as nullable yet we are seeing many lines due to unescaped commas mostly.

4. Could you read again df_movies in a way that enables you separate good from corrupted lines and store those corrupted lines in another dataframe?  
Try again the groupby+sort step with the clear dataset. Better?

In [102]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

# Define schema
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("year", IntegerType(), False),
    StructField("rank", FloatType(), True),
    StructField("_corrupt_record", StringType(), True)  
])

# Read with permissive mode and corrupt record tracking
df_movies = spark.read \
    .schema(schema) \
    .option("columnNameOfCorruptRecord", "_corrupt_record") \
    .option("mode", "PERMISSIVE") \
    .csv("csv/movies.csv", header=True)

# necessary step, otherwise all _corrupt_record data is lost, as it does not exist in the original csv
df_movies.cache()

DataFrame[id: int, name: string, year: int, rank: float, _corrupt_record: string]

In [103]:
df_movies.count()

388269

In [104]:
df_movies_good = df_movies.filter("_corrupt_record IS NULL")
df_movies_bad = df_movies.filter("_corrupt_record IS NOT NULL")

After capturing corrupted values we can see that the non-corrupted rows contain only acceptable values for the 'year' column

In [105]:
df_movies_good.groupBy("year").count().sort(col('year')).show(5)

+----+-----+
|year|count|
+----+-----+
|1888|    2|
|1890|    3|
|1891|    6|
|1892|    9|
|1893|    2|
+----+-----+
only showing top 5 rows



And the corrupted dataframe contains only the invalid year and nothing else

In [106]:
df_movies_bad.groupBy("year").count().sort(col('year')).show(5)

+----+-----+
|year|count|
+----+-----+
|NULL| 7214|
|   0|    1|
+----+-----+



In [107]:
df_movies_good.count()

381054

In [108]:
df_movies_bad.count()

7215

Not part of the exercises but let's try to retrieve the missing years from the corrupt columns

In [109]:
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("year", IntegerType(), False),
    StructField("rank", FloatType(), True),
])

In the name field there are single and triple quotes. Assigning '"' as both an escape and a quote character, is not perfect, but will do the trick of recovering dates.

In [110]:
df = spark.read \
    .option("header", True) \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .schema(schema) \
    .csv("csv/movies.csv")


In [111]:
df.count()

388269

In [112]:
df.sort(col("year").asc()).show(5)

+------+--------------------+----+----+
|    id|                name|year|rank|
+------+--------------------+----+----+
|282455|Roundhay Garden S...|1888|NULL|
|337409|Traffic Crossing ...|1888|NULL|
|218186| Monkeyshines, No. 1|1890| 7.3|
|218187| Monkeyshines, No. 2|1890|NULL|
|218188| Monkeyshines, No. 3|1890|NULL|
+------+--------------------+----+----+
only showing top 5 rows



In [113]:
df.sort(col("year").desc()).show(5)

+------+--------------------+----+----+
|    id|                name|year|rank|
+------+--------------------+----+----+
|139653|Harry Potter and ...|2008|NULL|
|272424|  Rapunzel Unbraided|2007|NULL|
| 92850|        DragonBall Z|2007|NULL|
|311040|        Spider-Man 3|2007|NULL|
|139654|Harry Potter and ...|2007|NULL|
+------+--------------------+----+----+
only showing top 5 rows

