In [0]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession  # Needed to create the Spark session
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import PipelineModel

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from time import sleep

inputPath = "/FileStore/tables/"


# Define the schema to speed up processing
csvSchema = StructType([ 
    StructField("CRASH DATE", StringType(), True),
    StructField("CRASH TIME", StringType(), True),
    StructField("BOROUGH", StringType(), True),
    StructField("ZIP CODE", StringType(), True),
    StructField("LATITUDE", FloatType(), True),
    StructField("LONGITUDE", FloatType(), True),
    StructField("LOCATION", StringType(), True),
    StructField("ON STREET NAME", StringType(), True),
    StructField("CROSS STREET NAME", StringType(), True),
    StructField("OFF STREET NAME", StringType(), True),
    StructField("NUMBER OF PERSONS INJURED", FloatType(), True),
    StructField("NUMBER OF PERSONS KILLED", FloatType(), True),
    StructField("NUMBER OF PEDESTRIANS INJURED", IntegerType(), True),
    StructField("NUMBER OF PEDESTRIANS KILLED", IntegerType(), True),
    StructField("NUMBER OF CYCLIST INJURED", IntegerType(), True),
    StructField("NUMBER OF CYCLIST KILLED", IntegerType(), True),
    StructField("NUMBER OF MOTORIST INJURED", IntegerType(), True),
    StructField("NUMBER OF MOTORIST KILLED", IntegerType(), True),
    StructField("CONTRIBUTING FACTOR VEHICLE 1", StringType(), True),
    StructField("CONTRIBUTING FACTOR VEHICLE 2", StringType(), True),
    StructField("CONTRIBUTING FACTOR VEHICLE 3", StringType(), True),
    StructField("CONTRIBUTING FACTOR VEHICLE 4", StringType(), True),
    StructField("CONTRIBUTING FACTOR VEHICLE 5", StringType(), True),
    StructField("COLLISION_ID", IntegerType(), True),
    StructField("VEHICLE TYPE CODE 1", StringType(), True),
    StructField("VEHICLE TYPE CODE 2", StringType(), True),
    StructField("VEHICLE TYPE CODE 3", StringType(), True),
    StructField("VEHICLE TYPE CODE 4", StringType(), True),
    StructField("VEHICLE TYPE CODE 5", StringType(), True)
])

default_values = {
    "CRASH DATE": "1900-01-01",
    "CRASH TIME": "00:00",
    "BOROUGH": "Unknown",
    "ZIP CODE": "00000",
    "LATITUDE": 0.0,
    "LONGITUDE": 0.0,
    "LOCATION": "Unknown",
    "ON STREET NAME": "Unknown",
    "CROSS STREET NAME": "Unknown",
    "OFF STREET NAME": "Unknown",
    "NUMBER OF PERSONS INJURED": 0.0,
    "NUMBER OF PERSONS KILLED": 0.0,
    "NUMBER OF PEDESTRIANS INJURED": 0,
    "NUMBER OF PEDESTRIANS KILLED": 0,
    "NUMBER OF CYCLIST INJURED": 0,
    "NUMBER OF CYCLIST KILLED": 0,
    "NUMBER OF MOTORIST INJURED": 0,
    "NUMBER OF MOTORIST KILLED": 0,
    "CONTRIBUTING FACTOR VEHICLE 1": "Unknown",
    "CONTRIBUTING FACTOR VEHICLE 2": "Unknown",
    "CONTRIBUTING FACTOR VEHICLE 3": "Unknown",
    "CONTRIBUTING FACTOR VEHICLE 4": "Unknown",
    "CONTRIBUTING FACTOR VEHICLE 5": "Unknown",
    "COLLISION_ID": 0,
    "VEHICLE TYPE CODE 1": "Unknown",
    "VEHICLE TYPE CODE 2": "Unknown",
    "VEHICLE TYPE CODE 3": "Unknown",
    "VEHICLE TYPE CODE 4": "Unknown",
    "VEHICLE TYPE CODE 5": "Unknown"
}



In [0]:
spark = SparkSession.builder.appName("DataStreamSimulation").getOrCreate()

In [0]:
streamingInputDF = (
    spark
        .readStream
        .option("maxFilesPerTrigger", 1)
        .option("header", "true")
        .schema(csvSchema)
        .csv(inputPath)
        .na.fill(default_values)
)

In [0]:
staticInputDF = (
    spark.read
        .option("header", "true")
        .schema(csvSchema)
        .csv(inputPath)
        .na.fill(default_values)
        .sample(withReplacement=False, fraction=0.5, seed=1234)  
)

display(staticInputDF)

query = (
    streamingInputDF
        .writeStream
        .format("memory")
        .queryName("collisions")
        .trigger(processingTime='10 seconds')
        .start()
)

CRASH DATE,CRASH TIME,BOROUGH,ZIP CODE,LATITUDE,LONGITUDE,LOCATION,ON STREET NAME,CROSS STREET NAME,OFF STREET NAME,NUMBER OF PERSONS INJURED,NUMBER OF PERSONS KILLED,NUMBER OF PEDESTRIANS INJURED,NUMBER OF PEDESTRIANS KILLED,NUMBER OF CYCLIST INJURED,NUMBER OF CYCLIST KILLED,NUMBER OF MOTORIST INJURED,NUMBER OF MOTORIST KILLED,CONTRIBUTING FACTOR VEHICLE 1,CONTRIBUTING FACTOR VEHICLE 2,CONTRIBUTING FACTOR VEHICLE 3,CONTRIBUTING FACTOR VEHICLE 4,CONTRIBUTING FACTOR VEHICLE 5,COLLISION_ID,VEHICLE TYPE CODE 1,VEHICLE TYPE CODE 2,VEHICLE TYPE CODE 3,VEHICLE TYPE CODE 4,VEHICLE TYPE CODE 5
06/29/2022,6:55,Unknown,0,0.0,0.0,Unknown,THROGS NECK BRIDGE,Unknown,Unknown,0.0,0.0,0,0,0,0,0,0,Following Too Closely,Unspecified,Unknown,Unknown,Unknown,4541903,Sedan,Pick-up Truck,Unknown,Unknown,Unknown
09/11/2021,9:35,BROOKLYN,11208,40.667202,-73.8665,"(40.667202, -73.8665)",Unknown,Unknown,1211 LORING AVENUE,0.0,0.0,0,0,0,0,0,0,Unspecified,Unknown,Unknown,Unknown,Unknown,4456314,Sedan,Unknown,Unknown,Unknown,Unknown
04/14/2021,12:47,Unknown,0,0.0,0.0,Unknown,MAJOR DEEGAN EXPRESSWAY RAMP,Unknown,Unknown,0.0,0.0,0,0,0,0,0,0,Unspecified,Unspecified,Unknown,Unknown,Unknown,4407458,Dump,Sedan,Unknown,Unknown,Unknown
12/14/2021,8:17,BRONX,10475,40.86816,-73.83148,"(40.86816, -73.83148)",Unknown,Unknown,344 BAYCHESTER AVENUE,2.0,0.0,0,0,0,0,2,0,Unspecified,Unspecified,Unknown,Unknown,Unknown,4486660,Sedan,Sedan,Unknown,Unknown,Unknown
12/14/2021,16:50,QUEENS,11413,40.675884,-73.75577,"(40.675884, -73.75577)",SPRINGFIELD BOULEVARD,EAST GATE PLAZA,Unknown,0.0,0.0,0,0,0,0,0,0,Turning Improperly,Unspecified,Unknown,Unknown,Unknown,4487127,Sedan,Station Wagon/Sport Utility Vehicle,Unknown,Unknown,Unknown
12/14/2021,23:10,QUEENS,11434,40.66684,-73.78941,"(40.66684, -73.78941)",NORTH CONDUIT AVENUE,150 STREET,Unknown,2.0,0.0,0,0,0,0,2,0,Reaction to Uninvolved Vehicle,Unspecified,Unknown,Unknown,Unknown,4486635,Sedan,Sedan,Unknown,Unknown,Unknown
12/14/2021,20:03,BROOKLYN,11226,40.65068,-73.95881,"(40.65068, -73.95881)",Unknown,Unknown,878 FLATBUSH AVENUE,4.0,0.0,0,0,0,0,4,0,Steering Failure,Unknown,Unknown,Unknown,Unknown,4486991,Sedan,Unknown,Unknown,Unknown,Unknown
12/11/2021,4:45,MANHATTAN,10001,40.748917,-73.993546,"(40.748917, -73.993546)",Unknown,Unknown,232 WEST 30 STREET,0.0,0.0,0,0,0,0,0,0,Following Too Closely,Unspecified,Unknown,Unknown,Unknown,4486905,Station Wagon/Sport Utility Vehicle,Unknown,Unknown,Unknown,Unknown
12/13/2021,6:30,QUEENS,11372,40.75373,-73.88505,"(40.75373, -73.88505)",82 STREET,34 AVENUE,Unknown,0.0,0.0,0,0,0,0,0,0,Unspecified,Unknown,Unknown,Unknown,Unknown,4486967,Sedan,Unknown,Unknown,Unknown,Unknown
12/14/2021,3:43,Unknown,0,40.804375,-73.93742,"(40.804375, -73.93742)",LEXINGTON AVENUE,Unknown,Unknown,1.0,0.0,1,0,0,0,0,0,Unspecified,Unknown,Unknown,Unknown,Unknown,4486304,Station Wagon/Sport Utility Vehicle,Unknown,Unknown,Unknown,Unknown


# Conteo de filas


In [0]:
%sql select count(*) from collisions

count(1)
486147


# Media y desviación estándar del número de personas heridas


In [0]:

%sql select avg(`NUMBER OF PERSONS INJURED`) as avg_persons_injured, stddev(`NUMBER OF PERSONS INJURED`) as stddev_persons_injured from collisions;


# La fecha más antigua y más reciente en el conjunto de datos


In [0]:
%sql SELECT MIN(`CRASH DATE`) AS min_date, MAX(`CRASH DATE`) AS max_date FROM collisions;

# La diferencia promedio entre el número de ciclistas heridos y el número de peatones heridos


In [0]:
%sql SELECT AVG(`NUMBER OF CYCLIST INJURED` - `NUMBER OF PEDESTRIANS INJURED`) AS avg_injured_difference FROM collisions;

# La proporción de colisiones con más de un cierto umbral de personas heridas


In [0]:
%sql
SELECT AVG(CASE WHEN `NUMBER OF PERSONS INJURED` > 1 THEN 1 ELSE 0 END) AS high_injury_ratio FROM collisions;

# Calcular el porcentaje de colisiones con al menos una persona fallecida por distrito


In [0]:
%sql SELECT `BOROUGH`, AVG(CASE WHEN `NUMBER OF PERSONS KILLED` > 0 THEN 1 ELSE 0 END) AS fatality_percentage FROM collisions GROUP BY `BOROUGH`;

In [0]:
%sql
SELECT * FROM collisions

In [0]:
# Lectura de datos estáticos y reemplazo de nulos
staticInputDF = (
    spark.read
        .option("header", "true")
        .schema(csvSchema)
        .csv(inputPath)
        .na.fill(default_values)
        .sample(withReplacement=False, fraction=0.2, seed=1234)  # Utilizar solo el 50% del dataset en streaming

)

In [0]:
# Seleccionar características relevantes y la etiqueta
features = [
    "LATITUDE", "LONGITUDE", 
    "NUMBER OF PERSONS INJURED",
    "NUMBER OF PERSONS KILLED", 
    "NUMBER OF PEDESTRIANS INJURED",
    "NUMBER OF PEDESTRIANS KILLED", 
    "NUMBER OF CYCLIST INJURED",
    "NUMBER OF CYCLIST KILLED", 
    "NUMBER OF MOTORIST INJURED",
    "NUMBER OF MOTORIST KILLED"
]
label = "NUMBER OF PERSONS INJURED"

# Ensamblar las características en un solo vector
assembler = VectorAssembler(inputCols=features, outputCol="features")

# Indexar la etiqueta
indexer = StringIndexer(inputCol=label, outputCol="label")

# Dividir los datos en conjuntos de entrenamiento y prueba
(trainingData, testData) = staticInputDF.randomSplit([0.8, 0.2], seed=1234)

# Crear el clasificador de bosque aleatorio
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)

# Crear el pipeline
pipeline = Pipeline(stages=[assembler, indexer, rf])

# Entrenar el modelo
model = pipeline.fit(trainingData)

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-226267395860929>:21[0m
[1;32m     18[0m indexer [38;5;241m=[39m StringIndexer(inputCol[38;5;241m=[39mlabel, outputCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mlabel[39m[38;5;124m"[39m)
[1;32m     20[0m [38;5;66;03m# Dividir los datos en conjuntos de entrenamiento y prueba[39;00m
[0;32m---> 21[0m (trainingData, testData) [38;5;241m=[39m staticInputDF[38;5;241m.[39mrandomSplit([[38;5;241m0.8[39m, [38;5;241m0.2[39m], seed[38;5;241m=[39m[38;5;241m1234[39m)
[1;32m     23[0m [38;5;66;03m# Crear el clasificador de bosque aleatorio[39;00m
[1;32m     24[0m rf [38;5;241m=[39m RandomForestClassifier(labelCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mlabel[39m[38;5;124m"[39m, featuresCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mfeatures[39m[38;5;124m"[39m, 

In [0]:
# Evaluar el modelo
predictions = model.transform(testData)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Guardar el modelo entrenado
model.save("/FileStore/models/random_forest_model")


In [0]:

# Lectura del flujo de datos (streaming) y reemplazo de nulos
streamingInputDF = (
    spark
        .readStream
        .option("maxFilesPerTrigger", 1)
        .option("header", "true")
        .schema(csvSchema)
        .csv(inputPath)
        .na.fill(default_values)
        .sample(withReplacement=False, fraction=0.5, seed=1234)  # Utilizar solo el 50% del dataset en streaming
)
# Aplicar el modelo entrenado al flujo de datos en streaming
def predict_batch(batchDF, batchId):
    if batchDF.count() > 0:
        model = PipelineModel.load("/FileStore/models/random_forest_model")
        result = model.transform(batchDF)
        result.select("features", "label", "prediction").show()

query = (
    streamingInputDF
        .writeStream
        .foreachBatch(predict_batch)
        .start()
)

query.awaitTermination()