In questo progetto relativo al corso di Big Data per l'università Roma Tre, si vuole analizzare il 
dataset:https://www.kaggle.com/new-york-city/nyc-parking-tickets .
Il dataset contiene le multe effettuate dalla polizia di NYC nel periodo [Agosto '13-Giugno '17] e l'obiettivo di questa analisi è quella di estrarre informazioni tramite queries relative a determinate condizioni e successivamente memorizzarle in un database NOSQL.
Infine, si effettuerà tramite la tecnica di Machine Learning LogisticRegression, una previsione mirata a comprendere lo stato di provenienza delle macchine maggiormente a rischio di multe. 



In [2]:
#Import per il progetto
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from cassandra.cluster import Cluster
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


Effettuiamo la connessione con il Database Cassandra, attivo tramite il contenitore Docker presso il localhost avente la porta :9042. Viene specificato in session il nome del keyspace presente su Apache Cassandra.

In [3]:
cas_cluster = Cluster(['127.0.0.1'], port= 9042)
session = cas_cluster.connect('mykeyspace')

Vengono adesso settati i parametri di configurazione che serviranno alla Session per avviarsi.

In [3]:
sparkConf=SparkConf().setMaster("local[*]")\
                     .setAppName("project")\
                     .setAll([('spark.executor.memory','16g'),
                            ('spark.executor.cores','4'),
                            ('spark.cores.max','8'),
                            ('spark.sql.debug.maxToStringFields','1000'),
                            ('spark.driver.memory','24g'),
                            ('spark.driver.maxResultSize','16g')])

In [4]:
spark=SparkSession.builder.config(conf=sparkConf).getOrCreate()

Caricamento del .csv dopo la fase di pulizia del file del cleaning.py

In [5]:
nyc_tickets=spark.read.csv('nyc_tickets_13_17.csv',header=True,inferSchema=True,ignoreLeadingWhiteSpace=True,ignoreTrailingWhiteSpace=True)


In [7]:
nyc_tickets.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Vehicle Expiration Date: string (nullable = true)
 |-- Violation Location: double (nullable = true)
 |-- Violation Precinct: double (nullable = true)
 |-- Issuer Precinct: double (nullable = true)
 |-- Issuer Code: double (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation County: s

In [8]:
nyc_tickets.count()

42332687

Controlliamo se sono presenti colonne che hanno troppi valori nulli nel dataset.

In [10]:
nyc_tickets.select([count(when(col(c).isNull(), c)).alias(c) for c in nyc_tickets.columns]).show()

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+-----------+-------------------+-------------------+-----------+------------+--------------------+----------------------+--------------------+------------------+-------------+---------------------+------------+------------+--------------+-------------------+---------------------+---------------------------------+-----------------+------------------------+------------+
|Summons Number|Plate ID|Registration State|Plate Type|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Issuing Agency|Street Code1|Street Code2|Street Code3|Vehicle Expiration Date|Violation Location|Violation Precinct|Issuer Precinct|Is

Evidenziamo come i campi "Time First Observed","Intersecting Street","Violation Legal Code","Unregistered Vehicle?","Meter Number","No Standing or Stopping Violation","Hydrant Violation","Double Parking Violation","Census Tract" presentino troppi valori nulli. Vengono eliminati dal dataset.

In [6]:
nyc_tickets=nyc_tickets.drop("Time First Observed","Intersecting Street","Violation Legal Code","Unregistered Vehicle?","Meter Number","No Standing or Stopping Violation","Hydrant Violation","Double Parking Violation","Census Tract")
nyc_tickets.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Vehicle Expiration Date: string (nullable = true)
 |-- Violation Location: double (nullable = true)
 |-- Violation Precinct: double (nullable = true)
 |-- Issuer Precinct: double (nullable = true)
 |-- Issuer Code: double (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Violation County: string (nullable = true)
 |-- Violation In Front Of 

Il campo 'Issue Date' viene diviso in tre colonne, 'Day','Year'e 'Month'.

In [7]:
nyc_tickets=nyc_tickets.withColumn("Issue Date",nyc_tickets["Issue Date"].cast("date"))
nyc_tickets=nyc_tickets.withColumn("Year",col("Issue Date").substr(1,4).cast("int"))
nyc_tickets=nyc_tickets.withColumn("Month",col("Issue Date").substr(6,2).cast("int"))
nyc_tickets=nyc_tickets.withColumn("Day",col("Issue Date").substr(9,2).cast("int"))

In [8]:

nyc_tickets = nyc_tickets.where(col("Plate ID").isNotNull())

La situazione finale del nostro dataset:

In [14]:
nyc_tickets.printSchema()
nyc_tickets.show(5)

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: date (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Vehicle Expiration Date: string (nullable = true)
 |-- Violation Location: double (nullable = true)
 |-- Violation Precinct: double (nullable = true)
 |-- Issuer Precinct: double (nullable = true)
 |-- Issuer Code: double (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Violation County: string (nullable = true)
 |-- Violation In Front Of Or

In [15]:
nyc_tickets.createTempView("nycTable")

Sommiamo le multe registrate per ogni stato di appartenenza del veicolo, mostrando quali sono gli stati da cui provengono le auto più multate

In [16]:
states_tickets=spark.sql("SELECT `Registration State` as registration_state,count('Registration State') as `state_tickets` FROM nycTable GROUP BY `registration_state` ORDER BY `state_tickets` DESC LIMIT 50")
states_tickets.show(5)

+------------------+-------------+
|registration_state|state_tickets|
+------------------+-------------+
|                NY|     32950575|
|                NJ|      3853393|
|                PA|      1069048|
|                CT|       583468|
|                FL|       543868|
+------------------+-------------+
only showing top 5 rows



Una volta effettuata la query questa verrà memorizzata sul cluster di cassandra.

In [17]:
session.execute("CREATE TABLE IF NOT EXISTS states_tickets (ind int primary key,registration_state varchar, state_tickets int)")
stmt = session.prepare("INSERT INTO states_tickets (ind, registration_state, state_tickets) VALUES (?, ?, ?)")

for ind, item in states_tickets.toPandas().iterrows():
    results = session.execute(stmt, [ind, item['registration_state'], item['state_tickets']])

Analizziamo quali sono i tipi di vetture più multate. 

In [20]:
number_tickets_bt=spark.sql("SELECT `Vehicle Body Type` as vehicle_body_type, count(*) as `number_of_tickets_for_the_vehicle_body_type` FROM nycTable GROUP BY `vehicle_body_type` ORDER BY`number_of_tickets_for_the_vehicle_body_type` DESC LIMIT 50")
number_tickets_bt.show(5)

+-----------------+-------------------------------------------+
|vehicle_body_type|number_of_tickets_for_the_vehicle_body_type|
+-----------------+-------------------------------------------+
|             SUBN|                                   13708699|
|             4DSD|                                   11922806|
|              VAN|                                    6022624|
|             DELV|                                    3065094|
|              SDN|                                    1826268|
+-----------------+-------------------------------------------+
only showing top 5 rows



In [21]:
session.execute("CREATE TABLE IF NOT EXISTS number_tickets_bt (ind int primary key,vehicle_body_type varchar, number_of_tickets_for_the_vehicle_body_type bigint)")
stmt = session.prepare("INSERT INTO number_tickets_bt (ind, vehicle_body_type, number_of_tickets_for_the_vehicle_body_type) VALUES (?, ?, ?)")

for ind, item in number_tickets_bt.toPandas().iterrows():
    results = session.execute(stmt, [ind, item['vehicle_body_type'], item['number_of_tickets_for_the_vehicle_body_type']])

Troviamo le tipolgie di multe più comuni.

In [22]:
freq_violation=spark.sql("SELECT `Violation Code` as violation_code, count('Registration Code') as `number_of_tickets_violation` FROM nycTable GROUP BY `violation_code` ORDER BY `number_of_tickets_violation` DESC LIMIT 50")
freq_violation.show(5)

+--------------+---------------------------+
|violation_code|number_of_tickets_violation|
+--------------+---------------------------+
|            21|                    5929420|
|            38|                    4867892|
|            14|                    3608660|
|            36|                    3583082|
|            37|                    2817819|
+--------------+---------------------------+
only showing top 5 rows



In [23]:
session.execute("CREATE TABLE IF NOT EXISTS freq_violation (ind int primary key,violation_code int , number_of_tickets_violation int)")
stmt = session.prepare("INSERT INTO freq_violation (ind, violation_code, number_of_tickets_violation) VALUES (?, ?, ?)")

for ind, item in freq_violation.toPandas().iterrows():
    results = session.execute(stmt, [ind, item['violation_code'], item['number_of_tickets_violation']])


Troviamo i distretti in cui sono effettuate più multe. 

In [24]:
violation_precinct=spark.sql("SELECT `Violation Precinct` as violation_precinct, count('Violation Precint') as `ticket_frequency` FROM nycTable GROUP BY `violation_precinct` ORDER BY `Ticket_Frequency` DESC LIMIT 50")
violation_precinct.show(5)

+------------------+----------------+
|violation_precinct|ticket_frequency|
+------------------+----------------+
|               0.0|         6411256|
|              19.0|         2185007|
|              18.0|         1514670|
|              14.0|         1449738|
|               1.0|         1234312|
+------------------+----------------+
only showing top 5 rows



In [25]:
session.execute("CREATE TABLE IF NOT EXISTS violation_precinct (ind int primary key,violation_precinct float, ticket_frequency float )")
stmt = session.prepare("INSERT INTO violation_precinct (ind, violation_precinct, ticket_frequency) VALUES (?, ?, ?)")

for ind, item in violation_precinct.toPandas().iterrows():
    results = session.execute(stmt, [ind, item['violation_precinct'], item['ticket_frequency']])

Selezioniamo le macchine più multate negli anni :

In [26]:
most_tickets=spark.sql("SELECT `Plate ID` as plate, count('Plate ID') as `plate_tickets` FROM nycTable GROUP BY `plate` ORDER BY `plate_tickets` DESC LIMIT 50")
most_tickets.show(5)

+----------+-------------+
|     plate|plate_tickets|
+----------+-------------+
|BLANKPLATE|        56686|
|   47603MD|         4003|
|   49781MA|         3534|
|   2028685|         3497|
|    AN917T|         3312|
+----------+-------------+
only showing top 5 rows



In [27]:
session.execute("CREATE TABLE IF NOT EXISTS most_tickets (ind int primary key,plate varchar, plate_tickets int)")
stmt = session.prepare("INSERT INTO most_tickets (ind, plate, plate_tickets) VALUES (?, ?, ?)")

for ind, item in most_tickets.toPandas().iterrows():
    results = session.execute(stmt, [ind, item['plate'], item['plate_tickets']])

Le case automobilistiche più multate:

In [28]:
common_makers = spark.sql("SELECT `Vehicle Make` as vehicle_maker, count('Vehicle Make') as maker_ticket_freq FROM nycTable GROUP BY `vehicle_maker` ORDER BY `maker_ticket_freq` DESC LIMIT 50")
common_makers.show(5)

+-------------+-----------------+
|vehicle_maker|maker_ticket_freq|
+-------------+-----------------+
|         FORD|          5374953|
|        TOYOT|          4460841|
|        HONDA|          4004792|
|        NISSA|          3315479|
|        CHEVR|          3114838|
+-------------+-----------------+
only showing top 5 rows



In [29]:
session.execute("CREATE TABLE IF NOT EXISTS common_makers (ind int primary key,vehicle_maker varchar, maker_ticket_freq int)")
stmt = session.prepare("INSERT INTO common_makers (ind, vehicle_maker, maker_ticket_freq) VALUES (?, ?, ?)")

for ind, item in common_makers.toPandas().iterrows():
    results = session.execute(stmt, [ind, item['vehicle_maker'], item['maker_ticket_freq']])


Per ogni anno individuiamo la somma delle multe registrate:

In [30]:
best_year=spark.sql("SELECT `Year` as year, count('Summons Number') as `total_year` FROM nycTable GROUP BY `Year` ORDER BY `total_year` DESC LIMIT 50")
best_year.show(5)

+----+----------+
|year|total_year|
+----+----------+
|2015|  11736901|
|2014|  10536290|
|2016|  10240094|
|2017|   5433075|
|2013|   4377492|
+----+----------+



In [31]:
session.execute("CREATE TABLE IF NOT EXISTS best_year (ind int primary key,year int, total_year int)")
stmt = session.prepare("INSERT INTO best_year (ind, year, total_year) VALUES (?, ?, ?)")

for ind, item in best_year.toPandas().iterrows():
    results = session.execute(stmt, [ind, item['year'], item['total_year']])

Indiviamo per ogni mese del periodo in analisi , la somma delle multe registrate. 

In [32]:
best_month=spark.sql("SELECT `Month` as month, count('Summons Number') as `total_month` FROM nycTable GROUP BY `Month` ORDER BY `total_month` DESC LIMIT 50")
best_month.show(12)

+-----+-----------+
|month|total_month|
+-----+-----------+
|   10|    3985513|
|    3|    3851127|
|    5|    3838998|
|    1|    3808758|
|    9|    3756397|
|    6|    3709627|
|    4|    3600577|
|   11|    3481936|
|    8|    3454190|
|   12|    3054589|
|    2|    3053541|
|    7|    2728599|
+-----+-----------+



In [34]:
session.execute("CREATE TABLE IF NOT EXISTS best_month (ind int primary key,month int, total_month int)")
stmt = session.prepare("INSERT INTO best_month (ind, month, total_month) VALUES (?, ?, ?)")

for ind, item in best_month.toPandas().iterrows():
    results = session.execute(stmt, [ind, item['month'], item['total_month']])
    

Iniziamo adesso operazioni di pulizia del dataset prima di passare all'analisi di Machine Learning.

In [9]:
nyc_tickets=nyc_tickets.withColumn("Violation Precinct",nyc_tickets["Violation Precinct"].cast("int"))
nyc_tickets=nyc_tickets.withColumn("Issuer Precinct",nyc_tickets["Issuer Precinct"].cast("int"))

In [10]:
ml_nyc = nyc_tickets.select(['Registration State', 'Day', 'Year', 'Month', 'Plate Type', 'Issuer Precinct', 'Violation Precinct', 'Issuing Agency'])
ml_nyc.show()
ml_nyc.printSchema()
ml_nyc.count()

+------------------+---+----+-----+----------+---------------+------------------+--------------+
|Registration State|Day|Year|Month|Plate Type|Issuer Precinct|Violation Precinct|Issuing Agency|
+------------------+---+----+-----+----------+---------------+------------------+--------------+
|                NY|  4|2013|    8|       PAS|             33|                33|             P|
|                NY|  4|2013|    8|       COM|             33|                33|             P|
|                NY|  5|2013|    8|       COM|             33|                33|             P|
|                NY|  5|2013|    8|       COM|             33|                33|             P|
|                NY|  8|2013|    8|       COM|             33|                33|             P|
|                NJ| 11|2013|    8|       PAS|             33|                33|             P|
|                NY| 11|2013|    8|       PAS|             33|                33|             P|
|                NJ|  7|2013| 

42323852

In [11]:
ml_nyc=ml_nyc.na.drop()


Per effettuare le analisi di Machine learning utilizziamo colonne senza valori nulli.

In [38]:
ml_nyc.select([count(when(col(c).isNull(), c)).alias(c) for c in ml_nyc.columns]).show()



+------------------+---+----+-----+----------+---------------+------------------+--------------+
|Registration State|Day|Year|Month|Plate Type|Issuer Precinct|Violation Precinct|Issuing Agency|
+------------------+---+----+-----+----------+---------------+------------------+--------------+
|                 0|  0|   0|    0|         0|              0|                 0|             0|
+------------------+---+----+-----+----------+---------------+------------------+--------------+



Le colonne che contengono stringhe devono essere trasformate in interi. Prima viene applicato lo String indexer e successivamente il OneHotEncoder

In [12]:
cat_col = ['Plate Type', 'Issuing Agency']
stages = []
for categoricalCol in cat_col:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

In [13]:
label_stringIdx = StringIndexer(inputCol='Registration State', outputCol="label")
stages += [label_stringIdx]

Creiamo il vettore con le features da considerare

In [14]:
assemblerInputs = [c + "classVec" for c in cat_col]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]


Effettuiamo le operazioni elencate in pipeline:

In [15]:
half_Pipeline = Pipeline().setStages(stages)

In [16]:
pipelineModel = half_Pipeline.fit(ml_nyc)

In [17]:
final_ml_nyc = pipelineModel.transform(ml_nyc)

In [18]:
selection = ["label", "features"]
dataset = final_ml_nyc.select(selection)

In [19]:
(train, test) = dataset.randomSplit([0.7, 0.3], seed=100)

Per effettuare le previsioni utilizziamo una Logistic Regression: dapprima alleniamo il modello sui dati di train appena generati effettuando successivamente le previsioni sui dati di test.

In [20]:
logreg = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)
logregModel = logreg.fit(train)

In [23]:
predictions = logregModel.transform(test)

Efficenza del modello:

In [24]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.6623773421608148

Accuratezza del modello:

In [25]:
# Accuracy: Evaluate model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName = 'accuracy')
evaluator.evaluate(predictions)

0.7798278859368312