# PySpark Notebook

## 0. Prerequisites

In [None]:
from pyspark.sql import SparkSession

POSTGRES_JAR = "C:/Program Files (x86)/PostgreSQL/pgJDBC/postgresql-42.7.2.jar"

spark = (SparkSession.builder
         .appName("Chinook_PySpark")
         .master("local[*]")
         .config("spark.jars", POSTGRES_JAR)
         .getOrCreate())

## 1. Connection details

In [2]:
PG_HOST = "localhost"
PG_PORT = 5432
PG_USER = "postgres"
PG_PASSWORD = "pass1234"
PG_DB = "chinook"

jdbc_url = f"jdbc:postgresql://{PG_HOST}:{PG_PORT}/{PG_DB}"
connection_properties = {
    "user": PG_USER,
    "password": PG_PASSWORD,
    "driver": "org.postgresql.Driver"
}
print("JDBC URL:", jdbc_url)

JDBC URL: jdbc:postgresql://localhost:5432/chinook


## 2. Helper - load any Chinook table as a DataFrame

In [3]:
from pyspark.sql import DataFrame

def load_table(table_name: str) -> DataFrame:
    print(f"Loading table '{table_name}'")
    return (spark.read
            .format("jdbc")
            .option("url", jdbc_url)
            .option("dbtable", table_name)
            .options(**connection_properties)
            .load())

In [4]:
artist_df = load_table("artist")
album_df  = load_table("album")
track_df  = load_table("track")

for name, df in [("artist", artist_df), ("album", album_df), ("track", track_df)]:
    print(f"Schema for {name}:")
    df.printSchema()
    print(f"Row count: {df.count()}")

Loading table 'artist'
Loading table 'album'
Loading table 'track'
Schema for artist:
root
 |-- artist_id: integer (nullable = true)
 |-- name: string (nullable = true)

Row count: 275
Schema for album:
root
 |-- album_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: integer (nullable = true)

Row count: 347
Schema for track:
root
 |-- track_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- album_id: integer (nullable = true)
 |-- media_type_id: integer (nullable = true)
 |-- genre_id: integer (nullable = true)
 |-- composer: string (nullable = true)
 |-- milliseconds: integer (nullable = true)
 |-- bytes: integer (nullable = true)
 |-- unit_price: decimal(10,2) (nullable = true)

Row count: 3503


## 3. Select & Filter

In [5]:
# Tracks that cost more than 99 cents, showing a few key columns
expensive_tracks = (track_df
                    .select("track_id", "name", "unit_price", "milliseconds")
                    .filter(track_df.unit_price > 0.99))

expensive_tracks.show(10, truncate=False)

+--------+--------------------------------------+----------+------------+
|track_id|name                                  |unit_price|milliseconds|
+--------+--------------------------------------+----------+------------+
|2819    |Battlestar Galactica: The Story So Far|1.99      |2622250     |
|2820    |Occupation / Precipice                |1.99      |5286953     |
|2821    |Exodus, Pt. 1                         |1.99      |2621708     |
|2822    |Exodus, Pt. 2                         |1.99      |2618000     |
|2823    |Collaborators                         |1.99      |2626626     |
|2824    |Torn                                  |1.99      |2631291     |
|2825    |A Measure of Salvation                |1.99      |2563938     |
|2826    |Hero                                  |1.99      |2713755     |
|2827    |Unfinished Business                   |1.99      |2622038     |
|2828    |The Passage                           |1.99      |2623875     |
+--------+----------------------------

## 4. Add a Column - `withColumn()`

In [6]:
from pyspark.sql import functions as F

tracks_with_minutes = track_df.withColumn(
    "length_minutes",
    F.round(track_df.milliseconds / 60000, 2)
)

tracks_with_minutes.select("name", "length_minutes").show(5, truncate=False)

+---------------------------------------+--------------+
|name                                   |length_minutes|
+---------------------------------------+--------------+
|For Those About To Rock (We Salute You)|5.73          |
|Balls to the Wall                      |5.71          |
|Fast As a Shark                        |3.84          |
|Restless and Wild                      |4.2           |
|Princess of the Dawn                   |6.26          |
+---------------------------------------+--------------+
only showing top 5 rows


## 5. GroupBy & Aggregate

In [7]:
# Number of tracks per album
tracks_per_album = (track_df
                    .groupBy("album_id")
                    .agg(F.count("track_id").alias("track_count"))
                    .orderBy(F.desc("track_count")))

tracks_per_album.show(10)

+--------+-----------+
|album_id|track_count|
+--------+-----------+
|     141|         57|
|      23|         34|
|      73|         30|
|     229|         26|
|     251|         25|
|     230|         25|
|     253|         24|
|     231|         24|
|      83|         24|
|     255|         23|
+--------+-----------+
only showing top 10 rows


## 6. Joining DataFrames

In [8]:
# Join albums to artists to list album titles with artist names
album_artist = (album_df
                .join(artist_df, album_df.artist_id == artist_df.artist_id, "inner")
                .select(artist_df.name.alias("artist"), album_df.title.alias("album")))

album_artist.show(10, truncate=False)

+----------------------------------------+-----------------------------------------------------------------------------------------------+
|artist                                  |album                                                                                          |
+----------------------------------------+-----------------------------------------------------------------------------------------------+
|Heroes                                  |Heroes, Season 1                                                                               |
|Antal Doráti & London Symphony Orchestra|Tchaikovsky: 1812 Festival Overture, Op.49, Capriccio Italien & Beethoven: Wellington's Victory|
|Frank Sinatra                           |My Way: The Best Of Frank Sinatra [Disc 1]                                                     |
|The Black Crowes                        |Live [Disc 2]                                                                                  |
|The Black Crowes          

## 7. Spark SQL

In [9]:
# Register temp views
artist_df.createOrReplaceTempView("artists")
album_df.createOrReplaceTempView("albums")
track_df.createOrReplaceTempView("tracks")

# Top 10 artists by total track count
spark.sql("""
    SELECT a.name AS artist, COUNT(t.track_id) AS num_tracks
    FROM artists a
    JOIN albums al ON a.artist_id = al.artist_id
    JOIN tracks t  ON t.album_id = al.album_id
    GROUP BY a.name
    ORDER BY num_tracks DESC
    LIMIT 10
""").show()

+---------------+----------+
|         artist|num_tracks|
+---------------+----------+
|    Iron Maiden|       213|
|             U2|       135|
|   Led Zeppelin|       114|
|      Metallica|       112|
|           Lost|        92|
|    Deep Purple|        92|
|      Pearl Jam|        67|
|  Lenny Kravitz|        57|
|Various Artists|        56|
|     The Office|        53|
+---------------+----------+



## 8. Save Results – write locally as Parquet

In [10]:
output_path = "top_artists.parquet"

spark.sql("SELECT * FROM artists LIMIT 1").write.mode("overwrite").parquet(output_path)
print("Wrote sample Parquet to", output_path)

Wrote sample Parquet to top_artists.parquet


---
### Summary Table: Common PySpark Transformations

| Transformation | PySpark Example |
|----------------|----------------|
| Select columns | `df.select("col1", "col2")` |
| Filter rows | `df.filter(df.col1 > 10)` |
| Group and agg | `df.groupBy("col1").agg(F.count("*"))` |
| Join DataFrames | `df1.join(df2, df1.key == df2.key)` |
| Add column | `df.withColumn("new_col", F.expr("col1 + col2"))` |
| Drop column | `df.drop("col_to_drop")` |
