# Part 1

# Homework Assignment: Joins in Apache Spark

## Objective
This assignment aims to familiarize you with joins in Apache Spark, a powerful distributed data processing engine. You will learn about the different types of joins available in Spark, their use cases, and how to implement them using PySpark (the Python API for Spark). We will use the famous MovieLens dataset, which is widely used in data science and machine learning for recommendation systems. This dataset allows us to demonstrate joins by combining information about movies and user ratings.

By the end of this assignment, you should be able to:
- Understand and differentiate between various join types in Spark.
- Load datasets into Spark DataFrames.
- Perform joins and analyze the results.
- Handle common issues like data skew or null values in joins.

## Prerequisites
- Install Apache Spark on your local machine or use a cloud-based environment like Databricks Community Edition (free tier) or Google Colab with PySpark installed.
- Python knowledge and familiarity with PySpark basics (e.g., creating SparkSessions and DataFrames).
- Download the MovieLens dataset as instructed below.

## Dataset: MovieLens 100K
We will use the MovieLens 100K dataset, which contains 100,000 ratings from 943 users on 1,682 movies. It includes two main files:
- `movies.csv`: Contains movie details (movieId, title, genres).
- `ratings.csv`: Contains user ratings (userId, movieId, rating, timestamp).

This dataset is perfect for joins because we can combine movie information with ratings based on the common `movieId` column.

### Instructions to Download and Use the Dataset
1. Visit the official GroupLens website: [https://grouplens.org/datasets/movielens/100k/](https://grouplens.org/datasets/movielens/100k/).
2. Download the "ml-100k.zip" file (it's free and no registration is required).
3. Unzip the file to a directory on your local machine (e.g., `/path/to/ml-100k/`).
4. The relevant files are:
   - `u.item` (rename to `movies.csv` for simplicity; format it as CSV with columns: movieId|title|genres – you may need to process it slightly).
   - `u.data` (rename to `ratings.csv`; format: userId|movieId|rating|timestamp, tab-separated – convert to CSV if needed).
5. In your PySpark code, load the files using `spark.read.csv()` with appropriate options (e.g., `header=False`, `sep='|'` for movies, `sep='\t'` for ratings).
   - Example loading code:
     ```python
     from pyspark.sql import SparkSession

     spark = SparkSession.builder.appName("JoinsAssignment").getOrCreate()

     # Load movies (assuming renamed and formatted as CSV)
     movies_df = spark.read.csv("/path/to/ml-100k/movies.csv", header=False, sep="|") \
         .toDF("movieId", "title", "genres")  # Adjust columns as needed

     # Load ratings
     ratings_df = spark.read.csv("/path/to/ml-100k/ratings.csv", header=False, sep="\t") \
         .toDF("userId", "movieId", "rating", "timestamp")
     ```

If you encounter issues with file formats, use pandas to preprocess and save as proper CSV before loading into Spark.

## Background: Joins in Spark
Joins in Spark are operations that combine two or more DataFrames based on a related column (join key). Spark supports distributed joins, which are efficient for large datasets but can be computationally expensive if not optimized (e.g., using broadcast joins for small datasets).

### Types of Joins in Spark
Spark provides several join types via the `DataFrame.join()` method. The syntax is:
```python
result_df = left_df.join(right_df, on="join_key_column", how="join_type")


# Part 2

In [None]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("MySparkApplication") \
        .getOrCreate()

In [None]:
!wget https://noaa-ghcn-pds.s3.amazonaws.com/csv.gz/2022.csv.gz
!wget https://noaa-ghcn-pds.s3.amazonaws.com/ghcnd-stations.txt

--2025-10-07 19:07:41--  https://noaa-ghcn-pds.s3.amazonaws.com/csv.gz/2022.csv.gz
Resolving noaa-ghcn-pds.s3.amazonaws.com (noaa-ghcn-pds.s3.amazonaws.com)... 52.217.89.204, 3.5.20.65, 54.231.225.57, ...
Connecting to noaa-ghcn-pds.s3.amazonaws.com (noaa-ghcn-pds.s3.amazonaws.com)|52.217.89.204|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 97608893 (93M) [application/octet-stream]
Saving to: ‘2022.csv.gz’


2025-10-07 19:07:43 (45.4 MB/s) - ‘2022.csv.gz’ saved [97608893/97608893]

--2025-10-07 19:07:43--  https://noaa-ghcn-pds.s3.amazonaws.com/ghcnd-stations.txt
Resolving noaa-ghcn-pds.s3.amazonaws.com (noaa-ghcn-pds.s3.amazonaws.com)... 52.216.206.243, 52.217.113.9, 3.5.30.182, ...
Connecting to noaa-ghcn-pds.s3.amazonaws.com (noaa-ghcn-pds.s3.amazonaws.com)|52.216.206.243|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 11150588 (11M) [application/octet-stream]
Saving to: ‘ghcnd-stations.txt’


2025-10-07 19:07:44 (15.7 MB/s) - ‘g

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, substring, expr, to_date
spark = SparkSession.builder.master("local[*]").appName("Colab").getOrCreate()

In [None]:
# Load station metadata (fixed-width text)
stations_df = spark.read.text("ghcnd-stations.txt").select(
    expr("substring(value, 1, 11) AS station"),
    expr("substring(value, 13, 8) AS latitude").cast("float"),
    expr("substring(value, 22, 9) AS longitude").cast("float"),
    expr("substring(value, 32, 6) AS elevation").cast("float"),
    expr("substring(value, 42, 30) AS name")
)

# Load observations CSV (no header)
raw_df = spark.read.option("header", False).csv("2022.csv.gz").toDF(
    "station", "date", "element", "value", "mflag", "qflag", "sflag", "obstime"
)

# Filter for TAVG, convert value to degrees C, parse date
tavg_df = raw_df.where((col("element") == "TAVG") & col("qflag").isNull()).select(
    "station",
    to_date(col("date"), "yyyyMMdd").alias("date"),
    (col("value").cast("float") / 10).alias("TAVG")
)

# Add 'code' column ('F' for US stations, 'C' otherwise) and convert TAVG to F where needed
tavg_df = tavg_df.withColumn("code", when(substring(col("station"), 1, 2) == "US", "F").otherwise("C"))
tavg_df = tavg_df.withColumn("TAVG", when(col("code") == "F", (col("TAVG") * 9/5) + 32).otherwise(col("TAVG")))

# Join with stations
processed_df = tavg_df.join(stations_df, on="station", how="left")

# Save as Parquet (this creates the 'stationData.parquet' directory with Parquet parts)
processed_df.write.mode("overwrite").parquet("stationData.parquet")

In [None]:
largeDF = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .parquet("stationData.parquet") \
    .limit(2000)

In [None]:
largeDF.show(10)

+-----------+----------+----+----+--------+---------+---------+--------------------+
|    station|      date|TAVG|code|latitude|longitude|elevation|                name|
+-----------+----------+----+----+--------+---------+---------+--------------------+
|AEM00041194|2022-01-01|21.1|   C|  25.255|   55.364|     10.4|DUBAI INTL       ...|
|AEM00041194|2022-01-02|23.4|   C|  25.255|   55.364|     10.4|DUBAI INTL       ...|
|AEM00041194|2022-01-03|24.1|   C|  25.255|   55.364|     10.4|DUBAI INTL       ...|
|AEM00041194|2022-01-04|22.7|   C|  25.255|   55.364|     10.4|DUBAI INTL       ...|
|AEM00041194|2022-01-05|21.4|   C|  25.255|   55.364|     10.4|DUBAI INTL       ...|
|AEM00041194|2022-01-06|20.8|   C|  25.255|   55.364|     10.4|DUBAI INTL       ...|
|AEM00041194|2022-01-07|19.7|   C|  25.255|   55.364|     10.4|DUBAI INTL       ...|
|AEM00041194|2022-01-08|19.7|   C|  25.255|   55.364|     10.4|DUBAI INTL       ...|
|AEM00041194|2022-01-09|19.9|   C|  25.255|   55.364|     10.4|DU