In [36]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import col
from pyspark.sql.functions import desc
from pyspark.sql.functions import udf

In [2]:
spark_session = SparkSession.builder\
    .master("spark://192.168.2.70:7077") \
    .appName("Sepehr_3B")\
    .config("spark.dynamicAllocation.enabled", True)\
    .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
    .config("spark.shuffle.service.enabled", True)\
    .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
    .config("spark.cores.max", 4)\
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/22 19:11:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
#1. upload and show the DataFrame
pc_df= spark_session.read.format("csv").option("header", "true").load("hdfs://192.168.2.70:9000/parking-citations.csv")
pc_df.show()

                                                                                

+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21T00:00:...|      1251|    null|       null|            CA|           200304|null|HOND|        PA|  

In [6]:
pc_df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [7]:
#3. Count the number of rows in the CSV file.
rows_count = pc_df.count()
print(f'Number of rows in the Parking Citation CSV file: {rows_count}')



Number of rows in the Parking Citation CSV file: 13077724


                                                                                

In [9]:
#4. Count the number of partitions in the underlying RDD.
partitions_count = pc_df.rdd.getNumPartitions()
print(f'Number of partitions in the Parking Citation CSV file: {partitions_count}')

Number of partitions in the Parking Citation CSV file: 16


In [10]:
#5. Drop the columns VIN, Latitude and Longitude.
pc_df = pc_df.drop("VIN", "Latitude", "Longitude")

In [14]:
#6. Find the maximum fine amount. How many fines have this amount?

# convert 'fine amount' column to float
pc_df = pc_df.withColumn("Fine amount", col("Fine amount").cast("float"))

# find the maximum fine amount
max_fine_amount = pc_df.agg({"Fine amount": "max"}).collect()[0][0]
print("Maximum fine amount: $", max_fine_amount)

# count the number of fines with this amount
num_max_fines = pc_df.filter(col("Fine amount") == max_fine_amount).count()
print("Number of fines with maximum fine amount: ", num_max_fines)

                                                                                

Maximum fine amount: $ 1100.0




Number of fines with maximum fine amount:  626


                                                                                

# orderby method
The orderBy method in PySpark is used to sort the data in a DataFrame or Dataset based on one or more columns. The method takes one or more column names as arguments, and returns a new DataFrame or Dataset sorted in ascending order based on the specified columns.

Here is a step-by-step overview of how the orderBy method works:

The method takes one or more column names as arguments, which specifies the columns to be used for sorting.

If multiple columns are specified, the DataFrame is sorted first by the values in the first column, then by the values in the second column, and so on.

By default, the sorting is performed in ascending order. If you want to sort in descending order, you can use the desc() method on the column, like this: df.orderBy(col("column_name").desc()).

The orderBy method returns a new DataFrame or Dataset that contains the same rows as the original, but with the rows sorted based on the specified columns.

The orderBy method does not modify the original DataFrame or Dataset. Instead, it returns a new one that you can use for further processing.

In summary, the orderBy method is a powerful tool for sorting data in PySpark, allowing you to easily sort your data by one or more columns in either ascending or descending order.






# wuthColumn() method
withColumn() is a method in PySpark's DataFrame API that allows you to add a new column to an existing DataFrame or replace an existing column. The method takes two arguments:

colName: The name of the new column you want to add or the name of the column you want to replace.
col: The expression or value that you want to use to compute the values of the new column.

DataFrame.withColumn(colName, col)

In [16]:
# 7. Show the top 20 most frequent vehicle makes, and their frequencies.

def freq(column_name, n):
    freq_df = pc_df.groupBy(column_name).count().orderBy(desc("count"))
    freq_df.show(n)

In [18]:
freq("Make", 20)



+----+-------+
|Make|  count|
+----+-------+
|TOYT|2150768|
|HOND|1479996|
|FORD|1116235|
|NISS| 945133|
|CHEV| 892676|
| BMW| 603092|
|MERZ| 543298|
|VOLK| 432030|
|HYUN| 404917|
|DODG| 391686|
|LEXS| 368420|
| KIA| 328155|
|JEEP| 316300|
|AUDI| 255395|
|MAZD| 242344|
|OTHR| 205546|
| GMC| 184889|
|INFI| 174315|
|CHRY| 159948|
|SUBA| 154640|
+----+-------+
only showing top 20 rows



                                                                                

# UDF
UDF stands for User-Defined Function. In the context of PySpark, a UDF is a way to define a custom function that can be applied to a PySpark DataFrame or column. UDFs allow you to use your own code to perform operations on DataFrame columns that are not supported by built-in PySpark functions.

When defining a UDF, you write a regular Python or Scala function and then register it with PySpark using the udf() function. Once registered, you can apply the UDF to a DataFrame column using the withColumn() method.

UDFs are useful when you need to apply a custom transformation to a PySpark DataFrame or column, especially when the built-in PySpark functions are not sufficient for your use case.

In [39]:
def map_color(color):
    COLORS = {
        'AL': 'Aluminum', 'AM': 'Amber', 'BG': 'Beige', 'BK': 'Black', 'BL': 'Blue',
        'BN': 'Brown', 'BR': 'Brown', 'BZ': 'Bronze', 'CH': 'Charcoal', 'DK': 'Dark',
        'GD': 'Gold', 'GO': 'Gold', 'GN': 'Green', 'GY': 'Gray', 'GT': 'Granite',
        'IV': 'Ivory', 'LT': 'Light', 'OL': 'Olive', 'OR': 'Orange', 'MR': 'Maroon',
        'PK': 'Pink', 'RD': 'Red', 'RE': 'Red', 'SI': 'Silver', 'SL': 'Silver',
        'SM': 'Smoke', 'TN': 'Tan', 'VT': 'Violet', 'WT': 'White', 'WH': 'White',
        'YL': 'Yellow', 'YE': 'Yellow', 'UN': 'Unknown'
    }
    return COLORS.get(color, color) # get(a,b) returns b as default value if it is not in the dictionary

# Create UDF from the function
map_color_udf = udf(map_color)

# Apply UDF to create new column
pc_df = pc_df.withColumn("color long", map_color_udf(pc_df["Color"]))

In [56]:
def freq_color_brand(brand_name, n):
    freq = pc_df.groupBy("Make", "color long").count().filter(pc_df.Make == brand_name).orderBy("count", ascending=False).take(n)
    print(f'{n} Most Frequent Color in {brand_name}:')
    for b, color, count in freq:
        print(f'{color}:{count}')

In [57]:
freq_color_brand('TOYT', 1)



1 Most Frequent Color in TOYT:
Gray:489697


                                                                                

In [58]:
spark_session.stop()