Reading CSV data with an inferred schema

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import transform, col, concat, lit
# Create a new SparkSession
spark = (SparkSession
         .builder
         .appName("read-csv-data")
         .master("spark://spark-master:7077")
         .config("spark.executor.memory", "512m")
         .getOrCreate())

# Set log level to ERROR
spark.sparkContext.setLogLevel("ERROR")

# Read CSV file into a DataFrame
df = (spark.read
      .format("csv")
      .option("header", "true")
      .load("/FileStore/tables/netflix_titles-1.csv"))

# Alternatively
## If your CSV file does not have a header row

# df = (spark.read
#       .format("csv")
#       .option("header", "false") # When the CSV file does not have any headers
#       .load("/FileStore/tables/netflix_titles-1.csv"))
# Display contents of DataFrame
df.show()

# Alternatively

# df.show(50)  # Display first 50 rows
# df.show(10, truncate=False)  # Display first 10 rows without truncation

+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|show_id|   type|               title|            director|                cast|             country|        date_added|release_year|rating| duration|           listed_in|         description|
+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|     s1|  Movie|Dick Johnson Is Dead|     Kirsten Johnson|                null|       United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|
|     s2|TV Show|       Blood & Water|                null|Ama Qamata, Khosi...|        South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|
|     s3|TV Show|           Ganglan

In [0]:
# Print schema of DataFrame
df.printSchema()

root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)



Reading CSV data with explicit schema

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

# Define a Schema
schema = StructType([
    StructField("show_id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("title", StringType(), True),
    StructField("director", StringType(), True),
    StructField("cast", StringType(), True),
    StructField("country", StringType(), True),
    StructField("date_added", DateType(), True),
    StructField("release_year", IntegerType(), True),
    StructField("rating", StringType(), True),
    StructField("duration", StringType(), True),
    StructField("listed_in", StringType(), True),
    StructField("description", StringType(), True)])

In [0]:
# Read CSV file into a DataFrame
df = (spark.read.format("csv")
      .option("header", "true")
      .schema(schema)
      .load("/FileStore/tables/netflix_titles-1.csv"))

      # Display contents of DataFrame
# Display first 5 rows of DataFrame
df.show(5)

+-------+-------+--------------------+---------------+--------------------+-------------+----------+------------+------+---------+--------------------+--------------------+
|show_id|   type|               title|       director|                cast|      country|date_added|release_year|rating| duration|           listed_in|         description|
+-------+-------+--------------------+---------------+--------------------+-------------+----------+------------+------+---------+--------------------+--------------------+
|     s1|  Movie|Dick Johnson Is Dead|Kirsten Johnson|                null|United States|      null|        2020| PG-13|   90 min|       Documentaries|As her father nea...|
|     s2|TV Show|       Blood & Water|           null|Ama Qamata, Khosi...| South Africa|      null|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|
|     s3|TV Show|           Ganglands|Julien Leclercq|Sami Bouajila, Tr...|         null|      null|        2021| TV-MA| 1 Season|Crime

Basic Transformation

In [0]:
df = (spark.read.format("json")
      .option("multiLine", "true")
      .load("/FileStore/tables/nobel_prizes.json"))

In [0]:
# Apply transform function to Numbers column
df_transformed = (
    df.select("category"
              , "overallMotivation"
              , "year"
              , "laureates"
              , transform(col("laureates"), lambda x: concat(x.firstname,lit(" "), x.surname))
              .alias("laureates_full_name")))

df_transformed.show()

+----------+--------------------+----+--------------------+--------------------+
|  category|   overallMotivation|year|           laureates| laureates_full_name|
+----------+--------------------+----+--------------------+--------------------+
| chemistry|                null|2022|[{Carolyn, 1015, ...|[Carolyn Bertozzi...|
| economics|                null|2022|[{Ben, 1021, "for...|[Ben Bernanke, Do...|
|literature|                null|2022|[{Annie, 1017, "f...|      [Annie Ernaux]|
|     peace|                null|2022|[{Ales, 1018, "Th...|[Ales Bialiatski ...|
|   physics|                null|2022|[{Alain, 1012, "f...|[Alain Aspect, nu...|
|  medicine|                null|2022|[{Svante, 1011, "...|      [Svante Pääbo]|
| chemistry|                null|2021|[{Benjamin, 1002,...|[Benjamin List, D...|
| economics|                null|2021|[{David, 1007, "f...|[David Card, Josh...|
|literature|                null|2021|[{Abdulrazak, 100...| [Abdulrazak Gurnah]|
|     peace|                

In [0]:
df_deduped = df.dropDuplicates(["category","overallMotivation", "year"])

df_deduped.show()

+---------+--------------------+-----------------+----+
| category|           laureates|overallMotivation|year|
+---------+--------------------+-----------------+----+
|chemistry|[{Jacobus H., 160...|             null|1901|
|chemistry|[{Emil, 161, "in ...|             null|1902|
|chemistry|[{Svante, 162, "i...|             null|1903|
|chemistry|[{Sir William, 16...|             null|1904|
|chemistry|[{Adolf, 164, "in...|             null|1905|
|chemistry|[{Henri, 165, "in...|             null|1906|
|chemistry|[{Eduard, 166, "f...|             null|1907|
|chemistry|[{Ernest, 167, "f...|             null|1908|
|chemistry|[{Wilhelm, 168, "...|             null|1909|
|chemistry|[{Otto, 169, "in ...|             null|1910|
|chemistry|[{Marie, 6, "in r...|             null|1911|
|chemistry|[{Victor, 172, "f...|             null|1912|
|chemistry|[{Alfred, 174, "i...|             null|1913|
|chemistry|[{Theodore W., 17...|             null|1914|
|chemistry|[{Richard, 176, "...|             nul

In [0]:
# Sort by year in ascending order
df_sorted = df.orderBy("year")

df_sorted.show()

+----------+--------------------+-----------------+----+
|  category|           laureates|overallMotivation|year|
+----------+--------------------+-----------------+----+
| chemistry|[{Jacobus H., 160...|             null|1901|
|literature|[{Sully, 569, "in...|             null|1901|
|     peace|[{Henry, 462, "fo...|             null|1901|
|   physics|[{Wilhelm Conrad,...|             null|1901|
|  medicine|[{Emil, 293, "for...|             null|1901|
| chemistry|[{Emil, 161, "in ...|             null|1902|
|literature|[{Theodor, 571, "...|             null|1902|
|     peace|[{Élie, 464, "for...|             null|1902|
|   physics|[{Hendrik A., 2, ...|             null|1902|
|  medicine|[{Ronald, 294, "f...|             null|1902|
|literature|[{Bjørnstjerne, 5...|             null|1903|
| chemistry|[{Svante, 162, "i...|             null|1903|
|     peace|[{Randal, 466, "f...|             null|1903|
|   physics|[{Henri, 4, "in r...|             null|1903|
|  medicine|[{Niels Ryberg, 2..

In [0]:
# Sort by year in descending order, then by category in ascending order
df_sorted = df.orderBy(["year", "category"], ascending=[False, True])

df_sorted.show()

+----------+--------------------+--------------------+----+
|  category|           laureates|   overallMotivation|year|
+----------+--------------------+--------------------+----+
| chemistry|[{Carolyn, 1015, ...|                null|2022|
| economics|[{Ben, 1021, "for...|                null|2022|
|literature|[{Annie, 1017, "f...|                null|2022|
|  medicine|[{Svante, 1011, "...|                null|2022|
|     peace|[{Ales, 1018, "Th...|                null|2022|
|   physics|[{Alain, 1012, "f...|                null|2022|
| chemistry|[{Benjamin, 1002,...|                null|2021|
| economics|[{David, 1007, "f...|                null|2021|
|literature|[{Abdulrazak, 100...|                null|2021|
|  medicine|[{David, 997, "fo...|                null|2021|
|     peace|[{Maria, 1005, "f...|                null|2021|
|   physics|[{Syukuro, 999, "...|"for groundbreaki...|2021|
| chemistry|[{Emmanuelle, 991...|                null|2020|
| economics|[{Paul, 995, "for...|       