# Section B: Spark `DataFrame` and SQL

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import max, udf, col
from pyspark.sql.types import StringType, IntegerType, ArrayType, DataType

In [2]:
spark_session = (
    SparkSession.builder.master("spark://host-192-168-2-70:7077")
    .appName("MJEnrico_A3_B")
    .config("spark.dynamicAllocation.enabled", True)
    .config("spark.dynamicAllocation.shuffleTracking.enabled", True)
    .config("spark.dynamicAllocation.executorIdleTimeout", "30s")
    .config("spark.shuffle.service.enabled", True)
    .config("spark.cores.max", 4)
    .getOrCreate()
)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/16 13:04:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Part 1: Loading

In [3]:
# https://spark.apache.org/docs/latest/sql-data-sources-csv.html

df = (
    spark_session.read
    .option("header", True)
    .option("delimiter", ",")
    .csv("hdfs://192.168.2.70:9000/parking-citations.csv")
)
df.show()

                                                                                

+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21T00:00:...|      1251|    null|       null|            CA|           200304|null|HOND|        PA|  

### Part 2: Printing

In [4]:
print(f"Type of object: {type(df)}")
df.printSchema()

Type of object: <class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullab

### Part 3: Row count

In [5]:
df.count()

                                                                                

13077724

### Part 4: Partition count

In [6]:
df.rdd.getNumPartitions()

16

### Part 5: Drop columns

In [7]:
df = df.drop("VIN", "Latitude", "Longitude")
df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



### Part 6: Getting max value

In [8]:
df = df.withColumn("Fine amount", df["Fine amount"].cast('float'))
df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: float (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [9]:
df.select(max(df["Fine amount"])).show()



+----------------+
|max(Fine amount)|
+----------------+
|          1100.0|
+----------------+



                                                                                

### Part 7: Aggregate by car maker

In [10]:
df.groupBy("make").count().orderBy("count", ascending = False).show()



+----+-------+
|make|  count|
+----+-------+
|TOYT|2150768|
|HOND|1479996|
|FORD|1116235|
|NISS| 945133|
|CHEV| 892676|
| BMW| 603092|
|MERZ| 543298|
|VOLK| 432030|
|HYUN| 404917|
|DODG| 391686|
|LEXS| 368420|
| KIA| 328155|
|JEEP| 316300|
|AUDI| 255395|
|MAZD| 242344|
|OTHR| 205546|
| GMC| 184889|
|INFI| 174315|
|CHRY| 159948|
|SUBA| 154640|
+----+-------+
only showing top 20 rows



                                                                                

### Part 8: New column from other column

In [11]:
COLORS = {
    "AL": "Aluminum",
    "AM": "Amber",
    "BG": "Beige",
    "BK": "Black",
    "BL": "Blue",
    "BN": "Brown",
    "BR": "Brown",
    "BZ": "Bronze",
    "CH": "Charcoal",
    "DK": "Dark",
    "GD": "Gold",
    "GO": "Gold",
    "GN": "Green",
    "GY": "Gray",
    "GT": "Granite",
    "IV": "Ivory",
    "LT": "Light",
    "OL": "Olive",
    "OR": "Orange",
    "MR": "Maroon",
    "PK": "Pink",
    "RD": "Red",
    "RE": "Red",
    "SI": "Silver",
    "SL": "Silver",
    "SM": "Smoke",
    "TN": "Tan",
    "VT": "Violet",
    "WT": "White",
    "WH": "White",
    "YL": "Yellow",
    "YE": "Yellow",
    "UN": "Unknown",
}    

In [12]:
# https://medium.com/@ayplam/developing-pyspark-udfs-d179db0ccc87

@udf(returnType = StringType())  # a decorator to wrap existing function into user-defined column mapper. epic
def color_expander(short_col):
    long_col = COLORS.get(short_col)
    if long_col:
        return long_col
    return short_col

In [13]:
df = df.withColumn("Color long", color_expander(col("Color")))
df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: float (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)
 |-- Color long: string (nullable = true)



In [14]:
df.select("Color", "Color long").sample(False, 0.00001).show()

[Stage 12:>                                                         (0 + 2) / 2]

+-----+----------+
|Color|Color long|
+-----+----------+
|   OT|        OT|
|   BK|     Black|
|   BK|     Black|
|   WT|     White|
|   GY|      Gray|
|   SI|    Silver|
|   WT|     White|
|   BK|     Black|
|   GY|      Gray|
|   GN|     Green|
|   RD|       Red|
|   WT|     White|
|   GY|      Gray|
|   WT|     White|
|   WH|     White|
|   BK|     Black|
|   TN|       Tan|
|   GY|      Gray|
|   GY|      Gray|
|   BK|     Black|
+-----+----------+
only showing top 20 rows



                                                                                

### Part 9: Color scheme count for TOYT

In [15]:
df_TOYT_color_long = (
    df.filter(df["Make"] == "TOYT")
    .groupBy("Color long")
    .count()
    .orderBy("count", ascending=False)
)
df_TOYT_color_long.show(25)



+----------+------+
|Color long| count|
+----------+------+
|      Gray|489697|
|     White|434595|
|     Black|353812|
|    Silver|347894|
|      Blue|180091|
|       Red|119074|
|     Green| 74968|
|      Gold| 40646|
|    Maroon| 26242|
|       Tan| 23355|
|     Beige| 15723|
|        OT| 15719|
|     Brown| 11454|
|    Yellow|  4372|
|        PR|  4272|
|    Orange|  3575|
|   Unknown|  2012|
|        TU|  1647|
|      null|   771|
|        CO|   730|
|      Pink|   117|
|        BU|     1|
|        TA|     1|
+----------+------+



                                                                                

In [16]:
spark_session.stop()