In [1]:
! pip install pyspark
! pip install findspark
! pip install folium
! pip install geopy
! pip install scikit-learn



In [41]:
import os
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, regexp_replace, split, col, size, array_contains, isnan, when, count, array, reverse, udf
from pyspark.sql.types import ArrayType, StringType, DoubleType
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier
import numpy as np
import pandas as pd
import sklearn
from sklearn.impute import SimpleImputer
from scipy.stats import kurtosis, skew
import math
import folium
from geopy.geocoders import Nominatim
from itertools import chain

In [3]:
findspark.init()

In [4]:
spark_url = 'local'
spark = SparkSession.builder\
        .master(spark_url)\
        .appName('Spark Project')\
        .getOrCreate()
spark.conf.set("spark.sql.csv.parser.multiLine", "true")

In [5]:
path = 'bangkok_traffy.csv'
df = spark.read.option("multiLine", "true").csv(path, header=True, inferSchema=True)
df.show(5)

+-----------+-------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+-----------+--------+--------------------+--------------------+--------------+----+------------+--------------------+
|  ticket_id|               type|        organization|             comment|               photo|         photo_after|            coords|             address|subdistrict|district|            province|           timestamp|         state|star|count_reopen|       last_activity|
+-----------+-------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+-----------+--------+--------------------+--------------------+--------------+----+------------+--------------------+
|2021-9LHDM6|                 {}|                null|            ไม่มีภาพ|https://storage.g...|                null|100.48661,13.79386|1867 จรัญสนิทวงศ์...|    บางพลัด| บางพล

In [6]:
null_counts = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns])
null_counts.show()

+---------+----+------------+-------+-----+-----------+------+-------+-----------+--------+--------+---------+-----+------+------------+-------------+
|ticket_id|type|organization|comment|photo|photo_after|coords|address|subdistrict|district|province|timestamp|state|  star|count_reopen|last_activity|
+---------+----+------------+-------+-----+-----------+------+-------+-----------+--------+--------+---------+-----+------+------------+-------------+
|     2413|1550|        2640|   3911| 2089|      85624|  2019|   4433|       2089|    2092|    2393|     2233| 2026|164087|      120042|         2558|
+---------+----+------------+-------+-----+-----------+------+-------+-----------+--------+--------+---------+-----+------+------------+-------------+



In [7]:
# Filter the DataFrame to include only rows with null values in the "column_name" column
def check_first_null(filtered_df):
    # Check if the filtered DataFrame is empty
    if filtered_df.count() == 0:
        return f"No null values in {filtered_df}."
    else:
        first_row = filtered_df.head()
        return first_row
    
ticket_id_null_df = df.filter(df.ticket_id.isNull())
coords_null_df = df.filter(df.coords.isNull())
address_null_df = df.filter(df.address.isNull())
print(check_first_null(ticket_id_null_df), "\n")
print(check_first_null(coords_null_df), "\n")
print(check_first_null(address_null_df), "\n")

Row(ticket_id=None, type='{ถนน}', organization='สำนักงาน ป.ป.ท.,เขตจอมทอง,ฝ่ายเทศกิจ เขตจอมทอง,ผอ.เขตจอมทอง (นายณัฐพงษ์),กลุ่มกรุงธนเหนือ (รองปลัดฯ เฉลิมพล)', comment=None, photo='https://storage.googleapis.com/traffy_public_bucket/TeamChadChart/corruption_photo2.png', photo_after='https://storage.googleapis.com/traffy_public_bucket/attachment/2022-06/e9596093de70ae8abacd6574f26a2d0f4466fe9f.jpg', coords='100.45568,13.69103', address=None, subdistrict='บางขุนเทียน', district='จอมทอง', province='กรุงเทพมหานคร', timestamp='2022-06-09 23:34:34.98044+00', state='เสร็จสิ้น', star='5', count_reopen=None, last_activity='2022-06-10 11:02:34.607728+00') 

Row(ticket_id='2022-7DABXT', type='{สะพาน}', organization=None, comment='"เคยดีใจมีสายสีน้ำเงินสถานี""แยกไฟฉาย""', photo=None, photo_after=None, coords=None, address=None, subdistrict=None, district=None, province=None, timestamp=None, state=None, star=None, count_reopen=None, last_activity=None) 

Row(ticket_id='2022-7DABXT', type='{สะพาน}', 

From above cells give us the first row with null value from each column selected (ticket_id, coords, address) to tell some relationship of those null value.
1. The ticket_id is null when the state='เสร็จสิ้น', it's mean we can drop this column significantly.
2. The address has null value 2 times more than coords. In the first null row we can see both of them are null. So it might tell that if no coords, no address too and not vice versa. We'll check in next step. 

In [8]:
sub = ['coords', 'address']
df = df.dropna(how='all', subset=sub)
sub_null_counts = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in sub])
sub_null_counts.show()

+------+-------+
|coords|address|
+------+-------+
|     1|   2415|
+------+-------+



As we can see, the null coords most dissapear. So, we don't need to map any address to coords and we can also drop all remained null column.

In [9]:
df = df.dropna(how='all', subset=['coords'])
df = df.dropna(how='all', subset=['address'])

From this step, we'll use df_use to be a data for ML

In [10]:
# change state to boolean 
df_use = df.withColumn('state', when(df.state == 'เสร็จสิ้น', 1).otherwise(0))

# change type to list
df_use = df_use.withColumn("type", split(regexp_replace("type", "[{}]", ""), ","))
df_use = df_use.dropna(how='all', subset=['type'])

# change coords to pair and swap them into format [latitude, longtitude]
flatten = udf(lambda x: list(chain.from_iterable(x)), ArrayType(StringType()))
df_use = df_use.withColumn('coords', array(reverse(split(df.coords, ','))))
df_use = df_use.withColumn('coords', flatten('coords'))
df_use.show(5)

+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+--------+--------------------+--------------------+-----+----+------------+--------------------+
|  ticket_id|                type|        organization|             comment|               photo|         photo_after|              coords|             address|subdistrict|district|            province|           timestamp|state|star|count_reopen|       last_activity|
+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+--------+--------------------+--------------------+-----+----+------------+--------------------+
|2021-9LHDM6|                  []|                null|            ไม่มีภาพ|https://storage.g...|                null|[13.79386, 100.48...|1867 จรัญสนิทวงศ์...|    บางพลัด| บางพลัด|       กรุงเ

In [58]:
df_ex = df_use.filter(size(col('type')) == 1)
df_ex.show(5)

+-----------+-----------+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+-----------+--------+--------------------+--------------------+-----+----+------------+--------------------+
|  ticket_id|       type|        organization|             comment|               photo|photo_after|              coords|             address|subdistrict|district|            province|           timestamp|state|star|count_reopen|       last_activity|
+-----------+-----------+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+-----------+--------+--------------------+--------------------+-----+----+------------+--------------------+
|2021-9LHDM6|         []|                null|            ไม่มีภาพ|https://storage.g...|       null|[13.79386, 100.48...|1867 จรัญสนิทวงศ์...|    บางพลัด| บางพลัด|       กรุงเทพมหานคร|2021-09-01 10:44:...|    0|null|        null|2022-02-22 04:59:.

In [65]:
first_element = udf(lambda x: x[0], StringType())
df_exploded = df_use.withColumn('type', first_element(df_use['type']))
df_exploded = df_exploded.filter("type != ''")

df_exploded.show()

+-----------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+-----------------+--------------------+--------------------+-----+----+------------+--------------------+
|  ticket_id|       type|        organization|             comment|               photo|         photo_after|              coords|             address|   subdistrict|         district|            province|           timestamp|state|star|count_reopen|       last_activity|
+-----------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+-----------------+--------------------+--------------------+-----+----+------------+--------------------+
|2021-FYJTFP|  ความสะอาด|          เขตบางซื่อ|             ขยะเยอะ|https://storage.g...|                null|[13.81865, 100.53...|12/14 ถนน กรุงเทพ...|          null|             null|

In [66]:
df_t = df_exploded.withColumn("latitude", col("coords")[0]) \
       .withColumn("longitude", col("coords")[1]) \
       .drop("coords")
df_t = df_t.withColumn("latitude", col("latitude").cast('double'))
df_t = df_t.withColumn("longitude", col("longitude").cast('double'))

In [67]:
null_latitude = df_t.filter(df_t.latitude.isNull())
null_longitude = df_t.filter(df_t.longitude.isNull())
print(check_first_null(null_latitude), "\n")
print(check_first_null(null_longitude), "\n")

Row(ticket_id='จักขอบคุณอย่างยิ่ง"', type='https://storage.googleapis.com/traffy_public_bucket/attachment/2022-02/5d20ecb864e916af948b4090ce9405be6f0b2f04.jpg', organization=None, comment='100.46930,13.75503', photo='1186 ถ. พรานนก แขวง บ้านช่างหล่อ เขตบางกอกน้อย กรุงเทพมหานคร 10700 ประเทศไทย', photo_after='บางขุนศรี', address='กรุงเทพมหานคร', subdistrict='2022-02-06 16:59:19.251528+00', district='กำลังดำเนินการ', province=None, timestamp=None, state=0, star=None, count_reopen=None, last_activity=None, latitude=None, longitude=None) 

Row(ticket_id='จักขอบคุณอย่างยิ่ง"', type='https://storage.googleapis.com/traffy_public_bucket/attachment/2022-02/5d20ecb864e916af948b4090ce9405be6f0b2f04.jpg', organization=None, comment='100.46930,13.75503', photo='1186 ถ. พรานนก แขวง บ้านช่างหล่อ เขตบางกอกน้อย กรุงเทพมหานคร 10700 ประเทศไทย', photo_after='บางขุนศรี', address='กรุงเทพมหานคร', subdistrict='2022-02-06 16:59:19.251528+00', district='กำลังดำเนินการ', province=None, timestamp=None, state=0, s

In [68]:
df_t = df_t.dropna(how='all', subset=['latitude'])
df_t = df_t.dropna(how='all', subset=['longitude'])
df_t = df_t.dropna(how='all', subset=['subdistrict'])

In [16]:
stages = []
categoricalAttributes = ['type']
for columnName in categoricalAttributes:
    stringIndexer = StringIndexer(inputCol=columnName, outputCol=columnName+ "Index")
    stages.append(stringIndexer)
    oneHotEncoder = OneHotEncoder(inputCol=columnName+ "Index", outputCol=columnName + "Vec")
    stages.append(oneHotEncoder)
    
categoricalCols = [s + "Vec" for s in categoricalAttributes]

numericColumns = ['latitude', 'longitude']

allFeatureCols = numericColumns + categoricalCols
vectorAssembler = VectorAssembler(
    inputCols=allFeatureCols,
    outputCol="features")
stages.append(vectorAssembler)

stages

[StringIndexer_6f8215f805b0,
 OneHotEncoder_bb3d82c4692f,
 VectorAssembler_737f6f55c8fa]

In [17]:
df_select = df_t.select(['type', 'subdistrict', 'latitude', 'longitude'])
train_df, test_df = df_select.randomSplit([0.8,0.2])

In [18]:
featurePipeline = Pipeline(stages=stages)
featureOnlyModel = featurePipeline.fit(train_df)

In [19]:
trainingFeaturesDf = featureOnlyModel.transform(train_df)
testFeaturesDf = featureOnlyModel.transform(test_df)
trainingFeaturesDf.show(1)

+-----+-----------+--------+---------+---------+---------------+--------------------+
| type|subdistrict|latitude|longitude|typeIndex|        typeVec|            features|
+-----+-----------+--------+---------+---------+---------------+--------------------+
|PM2.5| กระทุ่มราย|13.85535|100.86528|     17.0|(23,[17],[1.0])|(25,[0,1,19],[13....|
+-----+-----------+--------+---------+---------+---------------+--------------------+
only showing top 1 row



In [20]:
trainingFeaturesDf.select("features", 'latitude', 'longitude').rdd.take(5)

[Row(features=SparseVector(25, {0: 13.8554, 1: 100.8653, 19: 1.0}), latitude=13.85535, longitude=100.86528),
 Row(features=SparseVector(25, {0: 13.8563, 1: 100.863, 19: 1.0}), latitude=13.85633, longitude=100.86304),
 Row(features=SparseVector(25, {0: 13.6974, 1: 100.8515, 19: 1.0}), latitude=13.69736, longitude=100.8515),
 Row(features=SparseVector(25, {0: 13.7018, 1: 100.8542, 19: 1.0}), latitude=13.70184, longitude=100.85423),
 Row(features=SparseVector(25, {0: 13.7547, 1: 100.8514, 19: 1.0}), latitude=13.75473, longitude=100.85137)]

In [21]:
rf = RandomForestClassifier(numTrees=100, maxDepth=5, labelCol='subdistrict', featuresCol='features')

In [22]:
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol='type', featuresCol='features')
dtPipeline = Pipeline(stages=[dt])

In [23]:
dtPipelineModel = dtPipeline.fit(trainingFeaturesDf)

IllegalArgumentException: requirement failed: Column type must be of type numeric but was actually of type string.

In [69]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
type_indexer = StringIndexer(inputCol='type', outputCol='label')
df_indexed = type_indexer.fit(df_t).transform(df_t)

# Create a feature vector
assembler = VectorAssembler(inputCols=['latitude', 'longitude'], outputCol='features')
df_features = assembler.transform(df_indexed)

# Split data into training and test sets
train_data, test_data = df_features.randomSplit([0.7, 0.3], seed=112)

# Train Random Forest classifier
rf = RandomForestClassifier(numTrees=100, maxDepth=5, labelCol='label', featuresCol='features')
rf_model = rf.fit(train_data)

# Evaluate performance on test set
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
predictions = rf_model.transform(test_data)
accuracy = evaluator.evaluate(predictions)
print('Accuracy:', accuracy)

Accuracy: 0.2828463697250392


In [70]:
df_select = df_t.select(['type', 'latitude', 'longitude', 'subdistrict'])

indexer = StringIndexer(inputCol='type', outputCol='label')

subdistrict_indexer = StringIndexer(inputCol='subdistrict', outputCol='subdistrict'+'indexed')

assembler = VectorAssembler(inputCols=['latitude', 'longitude', 'subdistrict'], outputCol='features')
pipeline = Pipeline(stages=[indexer, assembler])
preprocessed_data = pipeline.fit(df_select).transform(df_select)

# Split data into training and testing sets
train_data, test_data = preprocessed_data.randomSplit([0.7, 0.3], seed=112)

# Train Decision Tree classifier
dt = DecisionTreeClassifier(labelCol='label', featuresCol='features', maxDepth=5)
dt_model = dt.fit(train_data)

# Make predictions on test set
predictions = dt_model.transform(test_data)

# Evaluate performance
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('Accuracy:', accuracy)

IllegalArgumentException: Data type string of column subdistrict is not supported.

In [71]:
df_select = df_t.select(['type', 'latitude', 'longitude'])

indexer = StringIndexer(inputCol='type', outputCol='label')
assembler = VectorAssembler(inputCols=['latitude', 'longitude'], outputCol='features')
pipeline = Pipeline(stages=[indexer, assembler])
preprocessed_data = pipeline.fit(df_select).transform(df_select)

# Split data into training and testing sets
train_data, test_data = preprocessed_data.randomSplit([0.7, 0.3], seed=112)

# Train Decision Tree classifier
dt = DecisionTreeClassifier(labelCol='label', featuresCol='features', maxDepth=5)
dt_model = dt.fit(train_data)

# Make predictions on test set
predictions = dt_model.transform(test_data)

# Evaluate performance
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('Accuracy:', accuracy)

Accuracy: 0.2815206349752931


In [31]:
df_select = df_t.select(['type', 'subdistrict', 'latitude', 'longitude'])

label_indexer = StringIndexer(inputCol="type", outputCol="label").fit(df_select)
df_select = label_indexer.transform(df_select)
feature_indexer = StringIndexer(inputCol="subdistrict", outputCol="features").fit(df_select)
df_select = feature_indexer.transform(df_select)
# oneHotEncoder = OneHotEncoder(inputCol="subdistrict" + "_index", outputCol="subdistrict" + "_vec")
# feature_indexed = feature_indexer.fit(df_select).transform(df_select)
# ohe = oneHotEncoder.fit(feature_indexer)
# feature_indexer = ohe.transform(feature_indexer)
# assembler = VectorAssembler(inputCols=["subdistrict" + "_vec"], outputCol='features')
# df_select = assembler.transform(df_select)

pipeline = Pipeline(stages=[label_indexer, assembler])
preprocessed_data = pipeline.fit(df_select).transform(df_select)

# Split data into training and testing sets
train_data, test_data = df_select.randomSplit([0.7, 0.3], seed=112)

# Train Decision Tree classifier
dt = DecisionTreeClassifier(labelCol='label', featuresCol='features', maxDepth=5)
dt_model = dt.fit(train_data)

# Make predictions on test set
predictions = dt_model.transform(test_data)

# Evaluate performance
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('Accuracy:', accuracy)

AttributeError: 'StringIndexer' object has no attribute '_jdf'

In [None]:
df_select = df_t.select(['type', 'subdistrict', 'latitude', 'longitude'])

label_indexer = StringIndexer(inputCol="type", outputCol="label").fit(df_select)
df_select = label_indexer.transform(df_select)
feature_indexer = StringIndexer(inputCol="subdistrict", outputCol="features").fit(df_select)
df_select = feature_indexer.transform(df_select)

pipeline = Pipeline(stages=[label_indexer, assembler])
preprocessed_data = pipeline.fit(df_select).transform(df_select)

# Split data into training and testing sets
train_data, test_data = df_select.randomSplit([0.7, 0.3], seed=112)

# Train Decision Tree classifier
dt = DecisionTreeClassifier(labelCol='label', featuresCol='features', maxDepth=5)
dt_model = dt.fit(train_data)

# Make predictions on test set
predictions = dt_model.transform(test_data)

# Evaluate performance
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('Accuracy:', accuracy)