In [1]:
# Import findspark to make pyspark importable as a regular library
import findspark
findspark.init('/usr/local/spark')

In [2]:
# Import required python library
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

# Create Spark Session
# The entry point to programming Spark with the Dataset 
spark = SparkSession.builder.appName("ACLED Middle East and South East Asia Clustering").getOrCreate()

In [3]:
# Print spark object ID
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f71919ca5d0>


In [4]:
# Load the dataset
df = spark.read.csv("/root/Lecture/BIGDATA/datasets/asia_conflicts.csv", header=True, inferSchema=True)

In [5]:
# Print top 20 rows data
df.show()

+-------+---+-------------+----------------+------------+----+--------------+--------------------+--------------------+--------------------+------+--------------------+--------------------+------+-----------+-------------+----------+-----------------+-----------------+------------------+------------------+--------+---------+-------------+--------------------+------------+--------------------+--------------------+----------+----------+
|data_id|iso|event_id_cnty|event_id_no_cnty|  event_date|year|time_precision|          event_type|              actor1|       assoc_actor_1|inter1|              actor2|       assoc_actor_2|inter2|interaction|       region|   country|           admin1|           admin2|            admin3|          location|latitude|longitude|geo_precision|              source|source_scale|               notes|          fatalities| timestamp|      iso3|
+-------+---+-------------+----------------+------------+----+--------------+--------------------+--------------------+---

In [6]:
# Count data rows
df.count()

216679

In [7]:
# inferSchema is used to inference the actual datatype of columns, especially for dates and timestamp
df.printSchema()

root
 |-- data_id: integer (nullable = true)
 |-- iso: integer (nullable = true)
 |-- event_id_cnty: string (nullable = true)
 |-- event_id_no_cnty: integer (nullable = true)
 |-- event_date: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- time_precision: integer (nullable = true)
 |-- event_type: string (nullable = true)
 |-- actor1: string (nullable = true)
 |-- assoc_actor_1: string (nullable = true)
 |-- inter1: integer (nullable = true)
 |-- actor2: string (nullable = true)
 |-- assoc_actor_2: string (nullable = true)
 |-- inter2: integer (nullable = true)
 |-- interaction: integer (nullable = true)
 |-- region: string (nullable = true)
 |-- country: string (nullable = true)
 |-- admin1: string (nullable = true)
 |-- admin2: string (nullable = true)
 |-- admin3: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- geo_precision: integer (nullable = true)
 |-- source

In [8]:
from pyspark.sql.functions import col

In [9]:
# Register the dataframe as a SQL temporary view
df.createOrReplaceTempView("asia_conflicts")

In [10]:
# Retrieve the data needed:
# - Latitude
# - Longitude
# - Event Type
# - Region

# data = spark.sql("SELECT `latitude`, `longitude`, `region`, `country`, `event_type`\
#                   FROM asia_conflicts")
data = spark.sql("SELECT `latitude`, `longitude`, `region`, `country`, `event_type`, `actor1`\
                  FROM asia_conflicts")

In [11]:
data.show()

+--------+---------+-------------+----------+--------------------+--------------------+
|latitude|longitude|       region|   country|          event_type|              actor1|
+--------+---------+-------------+----------+--------------------+--------------------+
| 22.3571|  91.7823|Southern Asia|Bangladesh|      Riots/Protests|Protesters (Bangl...|
| 17.8635|  82.1959|Southern Asia|     India|Violence against ...|CPI (Maoist): Com...|
| 17.8619|   82.197|Southern Asia|     India|Violence against ...|CPI (Maoist): Com...|
| 30.7362|  76.7884|Southern Asia|     India|      Riots/Protests|  Protesters (India)|
| 32.9413|  74.5498|Southern Asia|     India|Battle-No change ...|Military Forces o...|
| 32.9917|   74.932|Southern Asia|     India|      Riots/Protests|  Protesters (India)|
| 22.1669|  88.2822|Southern Asia|     India|Violence against ...|Unidentified Arme...|
| 20.2724|  85.8338|Southern Asia|     India|      Riots/Protests|     Rioters (India)|
| 24.7908|  93.9302|Southern Asi

In [12]:
data.count()

216679

In [13]:
# Import VectorAssembler
from pyspark.ml.feature import VectorAssembler

vector_assembler = VectorAssembler(inputCols=["latitude", "longitude"], outputCol="features")
df_assembled = vector_assembler.transform(data)

df_assembled.show()

+--------+---------+-------------+----------+--------------------+--------------------+-----------------+
|latitude|longitude|       region|   country|          event_type|              actor1|         features|
+--------+---------+-------------+----------+--------------------+--------------------+-----------------+
| 22.3571|  91.7823|Southern Asia|Bangladesh|      Riots/Protests|Protesters (Bangl...|[22.3571,91.7823]|
| 17.8635|  82.1959|Southern Asia|     India|Violence against ...|CPI (Maoist): Com...|[17.8635,82.1959]|
| 17.8619|   82.197|Southern Asia|     India|Violence against ...|CPI (Maoist): Com...| [17.8619,82.197]|
| 30.7362|  76.7884|Southern Asia|     India|      Riots/Protests|  Protesters (India)|[30.7362,76.7884]|
| 32.9413|  74.5498|Southern Asia|     India|Battle-No change ...|Military Forces o...|[32.9413,74.5498]|
| 32.9917|   74.932|Southern Asia|     India|      Riots/Protests|  Protesters (India)| [32.9917,74.932]|
| 22.1669|  88.2822|Southern Asia|     India|V

In [14]:
# Import KMeans
from pyspark.ml.clustering import KMeans

# Train a k-means model
kmeans = KMeans().setK(5).setSeed(1)
model = kmeans.fit(df_assembled.select('features'))

In [15]:
# Make predictions by transforming the initial dataframe
transform = model.transform(df_assembled)

transform.show()

+--------+---------+-------------+----------+--------------------+--------------------+-----------------+----------+
|latitude|longitude|       region|   country|          event_type|              actor1|         features|prediction|
+--------+---------+-------------+----------+--------------------+--------------------+-----------------+----------+
| 22.3571|  91.7823|Southern Asia|Bangladesh|      Riots/Protests|Protesters (Bangl...|[22.3571,91.7823]|         2|
| 17.8635|  82.1959|Southern Asia|     India|Violence against ...|CPI (Maoist): Com...|[17.8635,82.1959]|         1|
| 17.8619|   82.197|Southern Asia|     India|Violence against ...|CPI (Maoist): Com...| [17.8619,82.197]|         1|
| 30.7362|  76.7884|Southern Asia|     India|      Riots/Protests|  Protesters (India)|[30.7362,76.7884]|         3|
| 32.9413|  74.5498|Southern Asia|     India|Battle-No change ...|Military Forces o...|[32.9413,74.5498]|         3|
| 32.9917|   74.932|Southern Asia|     India|      Riots/Protest

In [16]:
# Import ClusteringEvaluator
from pyspark.ml.evaluation import ClusteringEvaluator

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(transform)

In [17]:
# Print the Silhouette score
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.806699423254


In [18]:
centers = model.clusterCenters()

print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[29.75465536 40.22710422]
[11.78937851 78.40535501]
[24.38819311 90.41659653]
[30.37042443 70.9009786 ]
[ 10.68785236 112.95842702]


In [19]:
# Convert to Pandas
import pandas as pd

# Convert to CSV, so mapbox API can render it
transform.toPandas().to_csv('/root/Lecture/BIGDATA/datasets/big_data_asia_conflicts.csv')

In [20]:
# Import pixiedust to do visualization
import pixiedust

# Open the CSV file
prediction = pixiedust.sampleData('file:///root/Lecture/BIGDATA/datasets/big_data_asia_conflicts.csv')

Pixiedust database opened successfully


Downloading 'file:///root/Lecture/BIGDATA/datasets/big_data_asia_conflicts.csv' from file:///root/Lecture/BIGDATA/datasets/big_data_asia_conflicts.csv
Downloaded 24546457 bytes
Creating pySpark DataFrame for 'file:///root/Lecture/BIGDATA/datasets/big_data_asia_conflicts.csv'. Please wait...
Loading file using 'SparkSession'
Successfully created pySpark DataFrame for 'file:///root/Lecture/BIGDATA/datasets/big_data_asia_conflicts.csv'


In [None]:
display(prediction)