In [0]:
from pyspark.sql.functions import col

df_raw = spark.table("bigdata_finals.default.raw_terorism")
df_raw.show(5)
df_raw.printSchema()


+------------+-----+------+----+----------+--------+----------+-------+------------------+------+--------------------+---------+-------------+---------+----------+-----------+--------+--------+-------+-----+-----+-----+---------+-----------+---------------+--------+-------+-------+-----------+--------------------+-----------+---------------+-----------+---------------+---------+--------------------+------------+--------------------+--------------------+--------------------+-------+------------------+---------+-------------+------------+----------------+-----+-------+-------+-----------+---------+-------------+------------+----------------+-----+-------+-------+-----------+--------------------+--------+------+---------+------+---------+------+-----------+-----------+-----------+----------+------+--------+-------+---------+-------------+------+----------+--------------+------+----------+--------------+---------+---------+-------------+------------+--------------------+---------+---------

In [0]:
from pyspark.sql.functions import col, when

df_casted = df_raw

# Convert integer-like columns
int_columns = [
    "iyear", "imonth", "iday",
    "country", "region",
    "attacktype1", "targtype1",
    "success", "suicide",
    "nkill", "nwound"
]

for c in int_columns:
    if c in df_casted.columns:
        df_casted = df_casted.withColumn(c, col(c).cast("int"))

# Convert double/float columns
double_columns = ["latitude", "longitude"]

for c in double_columns:
    if c in df_casted.columns:
        df_casted = df_casted.withColumn(c, col(c).cast("double"))

# Replace null casualties with 0
df_casted = (
    df_casted
    .withColumn("nkill", when(col("nkill").isNull(), 0).otherwise(col("nkill")))
    .withColumn("nwound", when(col("nwound").isNull(), 0).otherwise(col("nwound")))
)

# Create total casualties
df_casted = df_casted.withColumn("casualties", col("nkill") + col("nwound"))

# Convert text fields to string (safety)
string_columns = [
    "country_txt", "region_txt", "provstate", "city",
    "attacktype1_txt", "weaptype1_txt",
    "targtype1_txt", "gname"
]

for c in string_columns:
    if c in df_casted.columns:
        df_casted = df_casted.withColumn(c, col(c).cast("string"))

# Clean invalid coordinates (optional but recommended)
df_casted = df_casted.filter(
    (col("latitude").between(-90, 90)) &
    (col("longitude").between(-180, 180))
)

# Final check
df_casted.printSchema()
df_casted.show(5)


root
 |-- eventid: long (nullable = true)
 |-- iyear: integer (nullable = true)
 |-- imonth: integer (nullable = true)
 |-- iday: integer (nullable = true)
 |-- approxdate: string (nullable = true)
 |-- extended: long (nullable = true)
 |-- resolution: date (nullable = true)
 |-- country: integer (nullable = true)
 |-- country_txt: string (nullable = true)
 |-- region: integer (nullable = true)
 |-- region_txt: string (nullable = true)
 |-- provstate: string (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- specificity: long (nullable = true)
 |-- vicinity: long (nullable = true)
 |-- location: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- crit1: long (nullable = true)
 |-- crit2: long (nullable = true)
 |-- crit3: long (nullable = true)
 |-- doubtterr: long (nullable = true)
 |-- alternative: long (nullable = true)
 |-- alternative_txt: string (nullable = true)
 |-- multipl

In [0]:
cols_to_keep = [
    "eventid", "iyear", "imonth", "iday",
    "country", "country_txt",
    "region", "region_txt",
    "provstate", "city",
    "latitude", "longitude",
    "attacktype1", "attacktype1_txt",
    "gname",
    "nkill", "nwound",
    "weaptype1_txt",
    "targtype1_txt",
    "success", "suicide"
]

df = df_raw.select(*cols_to_keep)
df.show(5)
df.printSchema()


+------------+-----+------+----+-------+------------------+------+--------------------+---------+-------------+---------+----------+-----------+--------------------+--------------------+-----+------+-------------+--------------------+-------+-------+
|     eventid|iyear|imonth|iday|country|       country_txt|region|          region_txt|provstate|         city| latitude| longitude|attacktype1|     attacktype1_txt|               gname|nkill|nwound|weaptype1_txt|       targtype1_txt|success|suicide|
+------------+-----+------+----+-------+------------------+------+--------------------+---------+-------------+---------+----------+-----------+--------------------+--------------------+-----+------+-------------+--------------------+-------+-------+
|197000000001| 1970|     7|   2|     58|Dominican Republic|     2|Central America &...|     NULL|Santo Domingo|18.456792|-69.951164|          1|       Assassination|              MANO-D|    1|   0.0|      Unknown|Private Citizens ...|      1|     

In [0]:
from pyspark.sql.functions import col

df = (
    df
    .withColumn("iyear", col("iyear").cast("int"))
    .withColumn("imonth", col("imonth").cast("int"))
    .withColumn("iday", col("iday").cast("int"))
    .withColumn("latitude", col("latitude").cast("double"))
    .withColumn("longitude", col("longitude").cast("double"))
    .withColumn("nkill", col("nkill").cast("double"))
    .withColumn("nwound", col("nwound").cast("double"))
    .withColumn("success", col("success").cast("int"))
    .withColumn("suicide", col("suicide").cast("int"))
)
df.show(5)
df.printSchema()


+------------+-----+------+----+-------+------------------+------+--------------------+---------+-------------+---------+----------+-----------+--------------------+--------------------+-----+------+-------------+--------------------+-------+-------+
|     eventid|iyear|imonth|iday|country|       country_txt|region|          region_txt|provstate|         city| latitude| longitude|attacktype1|     attacktype1_txt|               gname|nkill|nwound|weaptype1_txt|       targtype1_txt|success|suicide|
+------------+-----+------+----+-------+------------------+------+--------------------+---------+-------------+---------+----------+-----------+--------------------+--------------------+-----+------+-------------+--------------------+-------+-------+
|197000000001| 1970|     7|   2|     58|Dominican Republic|     2|Central America &...|     NULL|Santo Domingo|18.456792|-69.951164|          1|       Assassination|              MANO-D|  1.0|   0.0|      Unknown|Private Citizens ...|      1|     

In [0]:
# incident_date from (iyear, imonth, iday)

# casualties = nkill + nwound

# decade (1970s, 1980s, etc.)

# high_casualty flag (casualties â‰¥ 10)

from pyspark.sql.functions import when, concat_ws, to_date, lit

# Build a date string "iyear-imonth-iday", but handle 0 month/day as null
df = df.withColumn(
    "date_str",
    when((col("imonth") > 0) & (col("iday") > 0),
         concat_ws("-", col("iyear"), col("imonth"), col("iday")))
    .otherwise(None)
)

df = df.withColumn("incident_date", to_date(col("date_str"), "yyyy-M-d")).drop("date_str")

# Casualties: kills + wounds
df = df.withColumn("nkill", when(col("nkill").isNull(), 0).otherwise(col("nkill")))
df = df.withColumn("nwound", when(col("nwound").isNull(), 0).otherwise(col("nwound")))

df = df.withColumn("casualties", col("nkill") + col("nwound"))

# Decade
df = df.withColumn(
    "decade",
    when(col("iyear") < 1980, "1970s")
    .when(col("iyear") < 1990, "1980s")
    .when(col("iyear") < 2000, "1990s")
    .when(col("iyear") < 2010, "2000s")
    .when(col("iyear") < 2020, "2010s")
    .otherwise("2020s+")
)

# High casualty flag
df = df.withColumn(
    "high_casualty",
    when(col("casualties") >= 10, "High (>=10)")
    .otherwise("Low/Medium (<10)")
)

df.show(5)
df.printSchema()


+------------+-----+------+----+-------+------------------+------+--------------------+---------+-------------+---------+----------+-----------+--------------------+--------------------+-----+------+-------------+--------------------+-------+-------+-------------+----------+------+----------------+
|     eventid|iyear|imonth|iday|country|       country_txt|region|          region_txt|provstate|         city| latitude| longitude|attacktype1|     attacktype1_txt|               gname|nkill|nwound|weaptype1_txt|       targtype1_txt|success|suicide|incident_date|casualties|decade|   high_casualty|
+------------+-----+------+----+-------+------------------+------+--------------------+---------+-------------+---------+----------+-----------+--------------------+--------------------+-----+------+-------------+--------------------+-------+-------+-------------+----------+------+----------------+
|197000000001| 1970|     7|   2|     58|Dominican Republic|     2|Central America &...|     NULL|San

In [0]:
# Drop records without year or region
df_clean = df.filter(col("iyear").isNotNull() & col("region_txt").isNotNull())

df_clean.show(5)
df_clean.printSchema()


+------------+-----+------+----+-------+------------------+------+--------------------+---------+-------------+---------+----------+-----------+--------------------+--------------------+-----+------+-------------+--------------------+-------+-------+-------------+----------+------+----------------+
|     eventid|iyear|imonth|iday|country|       country_txt|region|          region_txt|provstate|         city| latitude| longitude|attacktype1|     attacktype1_txt|               gname|nkill|nwound|weaptype1_txt|       targtype1_txt|success|suicide|incident_date|casualties|decade|   high_casualty|
+------------+-----+------+----+-------+------------------+------+--------------------+---------+-------------+---------+----------+-----------+--------------------+--------------------+-----+------+-------------+--------------------+-------+-------+-------------+----------+------+----------------+
|197000000001| 1970|     7|   2|     58|Dominican Republic|     2|Central America &...|     NULL|San

In [0]:
df_clean.write.mode("overwrite").saveAsTable("bigdata_finals.default.clean_terror")