In [29]:
try:
  import pyspark
except ImportError:
  !pip install pyspark
  import pyspark

import os
import time
import zipfile
import pandas as pd

from io import BytesIO, StringIO
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as f

In [52]:
def createSession(appName: str = "PySpark") -> SparkSession:
  return SparkSession.builder.appName(appName).getOrCreate()

def extractCSVBytes(pair):
  path, content = pair
  print(pair, path, content)
  z = zipfile.ZipFile(BytesIO(content))
  for name in z.namelist():
    if name.endswith(".csv"):
      return z.read(name).decode("utf-8")
  return None

def createDataFrame(z, spark: SparkSession):
  initial_time = time.time()
  print("\nCreating dataframe...")
  rdd_csv_text = z.map(extractCSVBytes).filter(lambda x: x is not None)
  rdd_lines = rdd_csv_text.flatMap(lambda csv: csv.split("\n"))

  df = spark.read.csv(rdd_lines, header=True, inferSchema=True)
  print(f"Dataframe created in {round(time.time() - initial_time, 2)} seconds.")
  return df

def copyFile(src: str, dest: str):
  initial_time = time.time()
  print(f"\nCopying file from  \"{src}\" to \"{dest}\"...")
  file_name = dest.split("/")[-1]
  folder_path = dest.replace(f"/{file_name}", "")
  if not os.path.exists(folder_path):
    init_time_2 = time.time()
    print(f"Creating directory {folder_path}")
    os.makedirs(folder_path)
    print(f"Directory created in {round(time.time() - init_time_2, 2)} seconds.")
  if os.path.isfile(dest):
    print("File already exists, skipping copying process...")
  else:
    !cp "$src" "$dest"
    print(f"File copied from \"{src}\" to \"{dest}\" in {round(time.time() - initial_time, 2)} seconds.")

def mountDrive():
  from google.colab import drive
  drive.mount('/content/drive')

In [56]:
def main():
  mountDrive()
  spark: SparkSession = createSession()
  sc = spark.sparkContext

  root = "/content"
  d_loc = f"{root}/drive/MyDrive"
  d_loc = f"{d_loc}/Colab Notebooks/Data Engineering practice exercises"
  d_loc = f"{d_loc}/Exercise #7"
  file_folder = "/data"
  file_name = "hard-drive-2022-01-01-failures.csv.zip"

  copyFile(
      src = d_loc + file_folder + "/" + file_name,
      dest = root + file_folder + "/" + file_name
  )

  rdd_zip = sc.binaryFiles(f"file:///{root}/{file_folder}/{file_name}")
  df = createDataFrame(rdd_zip, spark).cache()
  print()
  #df.show(10)

  if "source_file" not in df.columns and "file_date" not in df.columns and "brand" not in df.columns:
    df = (
          df
          .withColumn("source_file", f.lit(file_name))
          .withColumn(
            "file_date",
            f.to_date(
              f.regexp_extract("source_file", r"(\d{4}-\d{2}-\d{2})", 1)
            )
          )
          .withColumn(
            "brand",
            f.when(
                f.size(
                    f.split(f.col("model"), " ")
                ) > 1,
                f.split(
                    f.col("model"), " "
                )[0]
            ).otherwise("unknown")
          )
    )

  window: Window = Window.orderBy(f.col("total_capacity").desc())

  df2 = (
      df
        .select("model", "capacity_bytes")
        .groupBy("model")
        .avg()
        .withColumnRenamed("avg(capacity_bytes)", "total_capacity")
        .withColumn("storage_ranking", f.rank().over(window))
  )

  df = df.join(df2, on="model", how="left")
  #df.show(10)

  total_rows = df.count()
  unique_rows = []

  print("Percentage of unique rows in each column of the dataframe: ")
  for column in df.columns:
    unique_vals = df.select(column).distinct().count()
    percentage = round(unique_vals / total_rows * 100, 2)
    if percentage == 100:
      unique_rows.append(column)

    print(f"- {column}: {unique_vals} ({round(unique_vals / total_rows * 100, 2)}%)")

  if len(unique_rows) == 1:
    df = df.withColumn("primary_key", f.sha2(unique_rows[0], 256))
  elif len(unique_rows) > 1:
    df = (
        df.withColumn("unique_string", f.concat_ws("||", *unique_rows))
        .withColumn("primary_key", f.sha2(f.col("unique_string"), 256))
        .drop("unique_string")
    )
  else:
    print("No columns with all-unique values were found!")

  # Verify that primary_key column has all-unique values.
  print("\nAre all values in the primary_key column unique? ", df.select("primary_key").distinct().count() == df.count())
  df.show(10)


if "__main__" == __name__:
  main()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).

Copying file from  "/content/drive/MyDrive/Colab Notebooks/Data Engineering practice exercises/Exercise #7/data/hard-drive-2022-01-01-failures.csv.zip" to "/content/data/hard-drive-2022-01-01-failures.csv.zip"...
File already exists, skipping copying process...

Creating dataframe...
Dataframe created in 8.25 seconds.

Percentage of unique rows in each column of the dataframe: 
- model: 66 (0.03%)
- date: 1 (0.0%)
- serial_number: 206954 (100.0%)
- capacity_bytes: 15 (0.01%)
- failure: 2 (0.0%)
- smart_1_normalized: 63 (0.03%)
- smart_1_raw: 102081 (49.33%)
- smart_2_normalized: 32 (0.02%)
- smart_2_raw: 85 (0.04%)
- smart_3_normalized: 126 (0.06%)
- smart_3_raw: 3859 (1.86%)
- smart_4_normalized: 13 (0.01%)
- smart_4_raw: 321 (0.16%)
- smart_5_normalized: 48 (0.02%)
- smart_5_raw: 625 (0.3%)
- smart_7_normalized: 53 (0.03%)
- smart_7_raw: 102123 (49.35%)
- 