In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, LongType
from pyspark.sql.functions import col, count, sum, rank, when, lit, round, avg
from pyspark.sql import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Terrorist_Pipeline_2")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")
# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

Schema_Target = StructType([
    StructField("eventid", LongType(), True),
    StructField("target1", StringType(), True),
    StructField("natlty1_txt", StringType(), True),
    StructField("targtype1_txt", StringType(), True),
    StructField("targsubtype1_txt", StringType(), True),
    StructField("country_txt", StringType(), True),
    StructField("iyear", IntegerType(), True), 
    StructField("imonth", IntegerType(), True), 
    StructField("iday", IntegerType(), True),
    StructField("region_txt", StringType(), True),
    StructField("provstate", StringType(), True),
    StructField("city", StringType(), True)
])

# Define the schema
Schema_Event = StructType([
    StructField("eventid", LongType(), True),
    StructField("iyear", IntegerType(), True),  
    StructField("imonth", IntegerType(), True), 
    StructField("iday", IntegerType(), True),  
    StructField("extended", StringType(), True),
    StructField("resolution", StringType(), True),
    StructField("country_txt", StringType(), True),
    StructField("region_txt", StringType(), True),
    StructField("provstate", StringType(), True),
    StructField("city", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("gname", StringType(), True),
    StructField("gsubname", StringType(), True),
    StructField("gname2", StringType(), True),
    StructField("gsubname2", StringType(), True),
    StructField("gname3", StringType(), True),
    StructField("gsubname3", StringType(), True),
    StructField("success", IntegerType(), True),
    StructField("attacktype1_txt", StringType(), True),
])

#  Google Storage File Path
file_path_targets = 'gs://data_de2024_ga2/targets_of_events.csv'  
file_path_events = 'gs://data_de2024_ga2/event_informations.csv' 
# Create data frame
df_target = spark.read.schema(Schema_Target).format("csv").option("header", "true").option("delimiter", ",") \
       .load(file_path_targets)
df_events = spark.read.schema(Schema_Event).format("csv").option("header", "true").option("delimiter", ",") \
       .load(file_path_events)

#remove null values in target dataframe
nullbeforetarget = df_target.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_target.columns])
nullbeforetarget.show()
df_target = df_target.na.drop("any", subset=["target1", "targtype1_txt"]) #only of relevant columns
nullaftertarget = df_target.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_target.columns])
nullaftertarget.show()

#only select possible relevant features for event dataset
df_events = df_events.select("eventid", "iyear", "imonth", "iday", "extended", "country_txt", "region_txt", "provstate", "city", "latitude", "longitude", "gname", "success", "attacktype1_txt")

#remove null values in event dataframe
nullbefore = df_events.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_events.columns])
nullbefore.show()
df_events = df_events.na.drop("any", subset=["provstate", "city"])
nullafter = df_events.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_events.columns])
nullafter.show()

+-------+-------+-----------+-------------+----------------+-----------+-----+------+----+----------+---------+----+
|eventid|target1|natlty1_txt|targtype1_txt|targsubtype1_txt|country_txt|iyear|imonth|iday|region_txt|provstate|city|
+-------+-------+-----------+-------------+----------------+-----------+-----+------+----+----------+---------+----+
|      0|    638|       1559|            2|           10376|          3|   17|     7|   4|         3|      424| 438|
+-------+-------+-----------+-------------+----------------+-----------+-----+------+----+----------+---------+----+

+-------+-------+-----------+-------------+----------------+-----------+-----+------+----+----------+---------+----+
|eventid|target1|natlty1_txt|targtype1_txt|targsubtype1_txt|country_txt|iyear|imonth|iday|region_txt|provstate|city|
+-------+-------+-----------+-------------+----------------+-----------+-----+------+----+----------+---------+----+
|      0|      0|       1492|            0|           10130|   

In [4]:
df_target.show()

+------------+--------------------+------------------+--------------------+--------------------+------------------+-----+------+----+--------------------+-----------+-------------+
|     eventid|             target1|       natlty1_txt|       targtype1_txt|    targsubtype1_txt|       country_txt|iyear|imonth|iday|          region_txt|  provstate|         city|
+------------+--------------------+------------------+--------------------+--------------------+------------------+-----+------+----+--------------------+-----------+-------------+
|197000000001|        Julio Guzman|Dominican Republic|Private Citizens ...|      Named Civilian|Dominican Republic| 1970|     7|   2|Central America &...|       NULL|Santo Domingo|
|197000000002|Nadine Chaval, da...|           Belgium|Government (Diplo...|Diplomatic Person...|            Mexico| 1970|     0|   0|       North America|    Federal|  Mexico city|
|197001000001|            Employee|     United States| Journalists & Media|Radio Journalist/...

In [18]:
df_events.show()

+------------+-----+------+----+--------+------------------+------------------+-----------+-------------+----------+-----------+--------------------+-------+--------------------+
|     eventid|iyear|imonth|iday|extended|       country_txt|        region_txt|  provstate|         city|  latitude|  longitude|               gname|success|     attacktype1_txt|
+------------+-----+------+----+--------+------------------+------------------+-----------+-------------+----------+-----------+--------------------+-------+--------------------+
|197000000002| 1970|     0|   0|       0|            Mexico|     North America|    Federal|  Mexico city| 19.371887| -99.086624|23rd of September...|      1|Hostage Taking (K...|
|197001000001| 1970|     1|   0|       0|       Philippines|    Southeast Asia|     Tarlac|      Unknown| 15.478598| 120.599741|             Unknown|      1|       Assassination|
|197001000002| 1970|     1|   0|       0|            Greece|    Western Europe|     Attica|       Athens|

In [2]:
#Merge the datasets
join_columns = ['eventid']
overlap_columns = [col_name for col_name in df_target.columns if col_name in df_events.columns and col_name not in join_columns]

df_target= df_target.drop(*overlap_columns)    

jointarget = df_target["eventid"] == df_events['eventid']
target_merged = df_target.join(df_events, jointarget,"left").drop("eventid")

target_filtered = target_merged.filter(
    col("iyear").between(2015, 2017)
)

target_filtered.show(20)

+--------------------+--------------------+--------------------+--------------------+-----+------+----+--------+--------------------+--------------------+--------------------+----------------+---------+---------+--------------------+-------+--------------------+
|             target1|         natlty1_txt|       targtype1_txt|    targsubtype1_txt|iyear|imonth|iday|extended|         country_txt|          region_txt|           provstate|            city| latitude|longitude|               gname|success|     attacktype1_txt|
+--------------------+--------------------+--------------------+--------------------+-----+------+----+--------+--------------------+--------------------+--------------------+----------------+---------+---------+--------------------+-------+--------------------+
|            Officers|              Turkey|              Police|Police Security F...| 2015|     1|   1|       0|              Turkey|Middle East & Nor...|            Istanbul|        Istanbul|41.106178|28.689863

In [40]:
analysis_data = target_filtered.select(
    col("gname").alias("group_name"),
    col("iyear").alias("year"),
    col("targtype1_txt").alias("target_type"),
    col("success").cast("int")
)

grouped_data = analysis_data.groupBy("group_name", "target_type").agg(
    count("*").alias("total_attacks"),
    sum("success").alias("successful_attacks")
)

final_data = grouped_data.withColumn(
    "success_rate", round((col("successful_attacks") / col("total_attacks")) * 100, 2)
)

final_data_ordered = final_data.orderBy("group_name", "target_type")

final_data_ordered.show(truncate=False)

+---------------------------------+------------------------------+-------------+------------------+------------+
|group_name                       |target_type                   |total_attacks|successful_attacks|success_rate|
+---------------------------------+------------------------------+-------------+------------------+------------+
|A'chik Matgrik Elite Force (AMEF)|Business                      |4            |3                 |75.0        |
|A'chik Matgrik Elite Force (AMEF)|Private Citizens & Property   |2            |2                 |100.0       |
|Abbala extremists                |Private Citizens & Property   |4            |4                 |100.0       |
|Abbala extremists                |Religious Figures/Institutions|1            |1                 |100.0       |
|Abdul Ghani Kikli Militia        |Private Citizens & Property   |1            |1                 |100.0       |
|Abu Abbas Brigade                |Government (General)          |1            |1               

In [41]:
group_activity = target_filtered.groupBy("gname", "iyear")\
    .agg(count("*").alias("total_attacks"))

group_rank_window = Window.partitionBy("iyear").orderBy(col("total_attacks").desc())
group_activity_ranked = group_activity.withColumn("rank", rank().over(group_rank_window))

group_activity_ranked.filter(col("rank") <= 5).show()

+--------------------+-----+-------------+----+
|               gname|iyear|total_attacks|rank|
+--------------------+-----+-------------+----+
|             Unknown| 2015|         6397|   1|
|             Taliban| 2015|         1249|   2|
|Islamic State of ...| 2015|         1221|   3|
|          Boko Haram| 2015|          540|   4|
|          Al-Shabaab| 2015|          397|   5|
|             Unknown| 2016|         5969|   1|
|Islamic State of ...| 2016|         1454|   2|
|             Taliban| 2016|         1065|   3|
|          Al-Shabaab| 2016|          564|   4|
|Kurdistan Workers...| 2016|          363|   5|
|             Unknown| 2017|         4349|   1|
|Islamic State of ...| 2017|         1315|   2|
|             Taliban| 2017|          894|   3|
|          Al-Shabaab| 2017|          570|   4|
|New People's Army...| 2017|          358|   5|
+--------------------+-----+-------------+----+



In [42]:
group_activity = target_filtered.groupBy("region_txt", "iyear")\
    .agg(count("*").alias("total_attacks"))

group_rank_window = Window.partitionBy("iyear").orderBy(col("total_attacks").desc())
group_activity_ranked = group_activity.withColumn("rank", rank().over(group_rank_window))

group_activity_ranked.filter(col("rank") <= 5)

group_activity_cleaned = group_activity_ranked.selectExpr(
    "region_txt as region_name",
    "iyear as year",
    "total_attacks",
    "rank"
)

group_activity_cleaned.show()

+--------------------+----+-------------+----+
|         region_name|year|total_attacks|rank|
+--------------------+----+-------------+----+
|Middle East & Nor...|2015|         6036|   1|
|          South Asia|2015|         4585|   2|
|  Sub-Saharan Africa|2015|         1964|   3|
|      Southeast Asia|2015|         1072|   4|
|      Eastern Europe|2015|          684|   5|
|      Western Europe|2015|          333|   6|
|       South America|2015|          176|   7|
|       North America|2015|           62|   8|
|           East Asia|2015|           28|   9|
|Australasia & Oce...|2015|           14|  10|
|        Central Asia|2015|           10|  11|
|Central America &...|2015|            1|  12|
|Middle East & Nor...|2016|         6115|   1|
|          South Asia|2016|         3639|   2|
|  Sub-Saharan Africa|2016|         2077|   3|
|      Southeast Asia|2016|         1077|   4|
|      Western Europe|2016|          273|   5|
|       South America|2016|          159|   6|
|      Easter

In [46]:
analysis_data = target_filtered.select(
    col("region_txt").alias("region"),
    col("targtype1_txt").alias("target_type"),
    col("iyear").alias("year"), 
    col("attacktype1_txt").alias("attack_type"),
    col("gname").alias("group_name"),
    col("success").cast("int"),
    col("latitude").cast("double"),
    col("longitude").cast("double")
).filter(col("region").isNotNull() & col("latitude").isNotNull() & col("longitude").isNotNull())

aggregated_data = analysis_data.groupBy("region", "year").agg(
    count("*").alias("total_attacks"),
    sum("success").alias("successful_attacks")
).withColumn(
    "success_rate", round((col("successful_attacks") / col("total_attacks")) * 100, 2)
).distinct()

vector_assembler = VectorAssembler(
    inputCols=["total_attacks", "successful_attacks", "success_rate"],
    outputCol="features"
)
cluster_data = vector_assembler.transform(aggregated_data)

kmeans = KMeans(k=5, seed=1, featuresCol="features", predictionCol="cluster")
model = kmeans.fit(cluster_data)

clustered_data = model.transform(cluster_data).select(
    col("region"),
    col("year"),
    col("total_attacks"),
    col("successful_attacks"),
    col("success_rate"),
    col("cluster")
).distinct()

trend_summary = target_filtered.groupBy("region_txt", "iyear").agg(
    count("*").alias("attack_count"),
    round(avg("latitude").cast("double"), 6).alias("latitude"),  
    round(avg("longitude").cast("double"), 6).alias("longitude") 
).withColumnRenamed("region_txt", "region").withColumnRenamed("iyear", "year")

window_spec = Window.partitionBy("region").orderBy("year").rowsBetween(-1, 1)
rolling_avg = trend_summary.withColumn(
    "rolling_avg_attacks",
    round(avg("attack_count").over(window_spec), 2)
)

final_data = clustered_data.join(
    rolling_avg.select("region", "year", "rolling_avg_attacks", "latitude", "longitude"),
    on=["region", "year"],
    how="left"
)

final_data_ordered = final_data.orderBy("region", "year", "cluster")
final_data_ordered.show(50, truncate=False)

+---------------------------+----+-------------+------------------+------------+-------+-------------------+----------+----------+
|region                     |year|total_attacks|successful_attacks|success_rate|cluster|rolling_avg_attacks|latitude  |longitude |
+---------------------------+----+-------------+------------------+------------+-------+-------------------+----------+----------+
|Australasia & Oceania      |2015|14           |13                |92.86       |1      |12.0               |-35.572121|147.485055|
|Australasia & Oceania      |2016|10           |10                |100.0       |1      |12.0               |-35.90628 |143.257762|
|Australasia & Oceania      |2017|12           |9                 |75.0        |1      |11.0               |-16.273076|147.394068|
|Central America & Caribbean|2015|1            |0                 |0.0         |1      |2.0                |10.637896 |-61.405589|
|Central America & Caribbean|2016|3            |3                 |100.0       |1  

In [12]:
analysis_data = target_filtered.select(
    col("region_txt").alias("region"),
    col("targtype1_txt").alias("target_type"),
    col("iyear").alias("year"), 
    col("attacktype1_txt").alias("attack_type"),
    col("gname").alias("group_name"),
    col("success").cast("int"),
    col("latitude").cast("double"),
    col("longitude").cast("double")
).filter(col("region").isNotNull() & col("latitude").isNotNull() & col("longitude").isNotNull())

target_summary = analysis_data.groupBy("region", "year", "target_type").agg(
    count("*").alias("target_type_count")
)

target_distribution = target_summary.withColumn(
    "target_type_proportion",
    round((col("target_type_count") / sum("target_type_count").over(
        Window.partitionBy("region", "year")
    )) * 100, 2)
)

pivoted_targets = target_distribution.groupBy("region", "year").pivot(
    "target_type"
).agg(
    round(avg("target_type_proportion"), 2).alias("avg_target_proportion")
).fillna(0) 

aggregated_data = analysis_data.groupBy("region", "year").agg(
    count("*").alias("total_attacks"),
    sum("success").alias("successful_attacks")
).withColumn(
    "success_rate", round((col("successful_attacks") / col("total_attacks")) * 100, 2)
)

aggregated_with_targets = aggregated_data.join(
    pivoted_targets,
    on=["region", "year"],
    how="left"
)

target_columns = [col_name for col_name in pivoted_targets.columns if col_name not in ["region", "year"]]
vector_assembler = VectorAssembler(
    inputCols=["total_attacks", "successful_attacks", "success_rate"] + target_columns,
    outputCol="features"
)
cluster_data = vector_assembler.transform(aggregated_with_targets)

kmeans = KMeans(k=5, seed=1, featuresCol="features", predictionCol="cluster")
model = kmeans.fit(cluster_data)

clustered_data = model.transform(cluster_data).select(
    col("region"),
    col("year"),
    col("total_attacks"),
    col("successful_attacks"),
    col("success_rate"),
    col("cluster"),
    *target_columns 
)

trend_summary = target_filtered.groupBy("region_txt", "iyear").agg(
    count("*").alias("attack_count"),
    round(avg("latitude").cast("double"), 6).alias("latitude"),
    round(avg("longitude").cast("double"), 6).alias("longitude")
).withColumnRenamed("region_txt", "region").withColumnRenamed("iyear", "year")

window_spec = Window.partitionBy("region").orderBy("year").rowsBetween(-1, 1)
rolling_avg = trend_summary.withColumn(
    "rolling_avg_attacks",
    round(avg("attack_count").over(window_spec), 2)
)

final_data = clustered_data.join(
    rolling_avg.select("region", "year", "rolling_avg_attacks", "latitude", "longitude"),
    on=["region", "year"],
    how="left"
)

change_column_names = [
    f"`{col_name}` as `{col_name.lower().replace(' ', '_').replace('(', '').replace(')', '').replace('/', '_')}`"
    for col_name in target_columns
]

final_cleaned_data = final_data.selectExpr(
    "region",                           # Region name
    "year",                             # Year of the data
    "total_attacks",                    # Total number of attacks
    "successful_attacks",               # Number of successful attacks
    "success_rate",                     # Success rate of attacks
    "cluster",                          # Cluster assigned by KMeans
    "rolling_avg_attacks",              # Rolling average of attacks over 3 years
    "latitude",                         # Average latitude for the region and year
    "longitude",                        # Average longitude for the region and year
    *change_column_names                    # Cleaned target type proportions
)


final_target = final_cleaned_data.orderBy("region", "year", "cluster")
final_target.show(50, truncate=False)

+---------------------------+----+-------------+------------------+------------+-------+-------------------+----------+----------+----------------+-------------------+--------+-----------------------+--------------------+---------------------+------------------+-------------------+--------+--------+-----+-----+------+---------------------------+------------------------------+-----------------+----------------------------+--------+--------------+-------+---------+-----------------------+
|region                     |year|total_attacks|successful_attacks|success_rate|cluster|rolling_avg_attacks|latitude  |longitude |abortion_related|airports_&_aircraft|business|educational_institution|food_or_water_supply|government_diplomatic|government_general|journalists_&_media|maritime|military|ngo  |other|police|private_citizens_&_property|religious_figures_institutions|telecommunication|terrorists_non-state_militia|tourists|transportation|unknown|utilities|violent_political_party|
+---------------

In [13]:
# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "temp_de2024_ga2"  # use your bucket 
spark.conf.set('temporaryGcsBucket', bucket)
# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
# Saving the data to BigQuery
final_target.write.format('bigquery') \
  .option('table', 'de2024-435209.DE_Groupassignment_2.target_events') \
  .mode("overwrite") \
  .save()

In [None]:
# Stop the spark context
spark.stop()