In [1]:
!pip install pyspark



In [2]:
import zipfile
import os
import pandas as pd
from pathlib import Path
from pyspark.sql import SparkSession
from tempfile import TemporaryDirectory
import pyspark.sql.functions as F
from pyspark.sql import Window
from datetime import datetime

In [3]:
spark = SparkSession.builder.appName("Exercise7").getOrCreate()

In [4]:
import os
import zipfile
from pyspark.sql import SparkSession

def read_csv_from_zip(download_loc: str = '/content/'):
    """
    Read CSV files from ZIP archives directly into Spark DataFrame using a direct approach.

    Args:
        download_loc (str): Directory path containing ZIP files

    Returns:
        pyspark.sql.DataFrame: Spark DataFrame containing CSV data

    Raises:
        FileNotFoundError: If no ZIP files found
        ValueError: If no CSV files found in ZIP
    """
    try:
        # Find first ZIP file in directory
        zip_files = [f for f in os.listdir(download_loc) if f.endswith('.zip')]
        if not zip_files:
            raise FileNotFoundError("No ZIP files found in directory")

        file_path = os.path.abspath(os.path.join(download_loc, zip_files[0]))
        print(f"Processing ZIP file: {file_path}")

        # Create an extraction directory if it doesn't exist
        extract_dir = os.path.join(download_loc, 'temp_extract')
        os.makedirs(extract_dir, exist_ok=True)

        # Use context manager for ZIP file handling
        with zipfile.ZipFile(file_path) as zip_ref:
            # Find first CSV file in ZIP
            csv_files = [f for f in zip_ref.namelist() if f.endswith('.csv')]
            if not csv_files:
                raise ValueError("No CSV files found in ZIP archive")

            # Extract CSV file
            csv_path = zip_ref.extract(csv_files[0], extract_dir)

            try:
                # Read directly into Spark DataFrame
                df = spark.read.csv(
                    csv_path,
                    header=True,
                    inferSchema=True,
                    mode='PERMISSIVE',
                    multiLine=True,
                    escape='"'
                )
                return df
            finally:
              pass

    except Exception as e:
        print(f"Error processing file: {str(e)}")
        raise

In [5]:
df = read_csv_from_zip()
df.show()

Processing ZIP file: /content/hard-drive-2022-01-01-failures.csv.zip
+----------+--------------+--------------------+--------------+-------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+----

1. Add the file name as a column to the DataFrame and call it `source_file`.

In [6]:
# add a column and place all values to be a string
df = df.withColumn('source_file', F.lit('hard-drive-2022-01-01-failures.csv'))

2. Pull the date located inside the string of the `source_file` column. Final data-type must be date or timestamp, not a string. Call the new column `file_date`.

In [7]:
df = df.withColumn("file_date", F.to_date(F.regexp_extract(df["source_file"], r"(\d{4}-\d{2}-\d{2})", 1), "yyyy-MM-dd"))
df.schema['file_date'].dataType

DateType()

3. Add a new column called `brand`. It will be based on the column `model`. If the column `model` has a space ... aka   in it, split on that space. The value found before the space   will be considered the brand. If there is no space to split on, fill in a value called unknown for the brand.

In [8]:
df = df.withColumn('brand',
    F.when(F.col("model").contains(" "), F.split(F.col("model")," ")[0]).otherwise("unknown")
)

4. Inspect a column called `capacity_bytes`. Create a secondary DataFrame that relates `capacity_bytes` to the model column, create "buckets" / "rankings" for those models with the most capacity to the least. Bring back that data as a column called storage_ranking into the main dataset.

In [9]:
new_df = df.select('capacity_bytes', 'model')
new_df.show()

+--------------+--------------------+
|capacity_bytes|               model|
+--------------+--------------------+
|14000519643136|       ST14000NM001G|
|12000138625024|       ST12000NM001G|
| 8001563222016|        ST8000NM0055|
| 8001563222016|        ST8000NM0055|
|14000519643136| TOSHIBA MG07ACA14TA|
| 4000787030016|HGST HMS5C4040BLE640|
| 8001563222016|         ST8000DM002|
|14000519643136| TOSHIBA MG07ACA14TA|
|14000519643136| TOSHIBA MG07ACA14TA|
|12000138625024|       ST12000NM0008|
|12000138625024|       ST12000NM001G|
|12000138625024|       ST12000NM001G|
|12000138625024|       ST12000NM001G|
|14000519643136|       ST14000NM001G|
|12000138625024|       ST12000NM001G|
|12000138625024|       ST12000NM001G|
|12000138625024|       ST12000NM001G|
|14000519643136| TOSHIBA MG07ACA14TA|
|12000138625024|       ST12000NM0008|
|12000138625024|       ST12000NM0008|
+--------------+--------------------+
only showing top 20 rows



In [10]:
new_df.sort(F.col('capacity_bytes').desc()).show()

+--------------+-------------+
|capacity_bytes|        model|
+--------------+-------------+
|18000207937536|ST18000NM000J|
|18000207937536|ST18000NM000J|
|18000207937536|ST18000NM000J|
|18000207937536|ST18000NM000J|
|18000207937536|ST18000NM000J|
|18000207937536|ST18000NM000J|
|18000207937536|ST18000NM000J|
|18000207937536|ST18000NM000J|
|18000207937536|ST18000NM000J|
|18000207937536|ST18000NM000J|
|18000207937536|ST18000NM000J|
|18000207937536|ST18000NM000J|
|18000207937536|ST18000NM000J|
|18000207937536|ST18000NM000J|
|18000207937536|ST18000NM000J|
|18000207937536|ST18000NM000J|
|18000207937536|ST18000NM000J|
|18000207937536|ST18000NM000J|
|18000207937536|ST18000NM000J|
|18000207937536|ST18000NM000J|
+--------------+-------------+
only showing top 20 rows



In [11]:
df.select('capacity_bytes').distinct().count()

15

In [12]:
new_df = new_df.withColumn('storage_ranking', F.rank().over(Window.orderBy(F.col('capacity_bytes').desc())))

In [13]:
new_df.select('storage_ranking', 'capacity_bytes').show()

+---------------+--------------+
|storage_ranking|capacity_bytes|
+---------------+--------------+
|              1|18000207937536|
|              1|18000207937536|
|              1|18000207937536|
|              1|18000207937536|
|              1|18000207937536|
|              1|18000207937536|
|              1|18000207937536|
|              1|18000207937536|
|              1|18000207937536|
|              1|18000207937536|
|              1|18000207937536|
|              1|18000207937536|
|              1|18000207937536|
|              1|18000207937536|
|              1|18000207937536|
|              1|18000207937536|
|              1|18000207937536|
|              1|18000207937536|
|              1|18000207937536|
|              1|18000207937536|
+---------------+--------------+
only showing top 20 rows



In [17]:
df = df.join(new_df.select('capacity_bytes', 'storage_ranking'), on='capacity_bytes', how='left')

5. Create a column called `primary_key` that is hash of columns that make a record unique in this dataset.

In [None]:
df = df.withColumn('primary_key', F.hash(*[F.col(c) for c in df.columns]))