In [1]:
import pandas as pd
from pyspark.sql import *
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext
import findspark
findspark.init()
findspark.find()

'C:\\spark\\spark-3.4.2-bin-hadoop3'

# Local MongoDB Connection

In [2]:
#Create SparkSession
spark = SparkSession.\
        builder.\
        appName("MongoDB").\
        config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").\
        getOrCreate()

#Read data from local mongodb server
df_local = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
        .option("uri", "mongodb://localhost:27017/")\
        .option("database", "GTD")\
        .option("collection", "gtd_clean").load()

#Show the dataframe
df_local.show()

+-------+--------+-------+--------+--------------------+-----------+--------------------+-------------+-------+---------+-------------+---------+--------------------+-------+------------------+-----+-----+-----+---+---------+----------+--------+--------------------+--------+-----------+--------------+--------------------+----------+---------+----------+-----------+-----+--------------------+--------+-------+------------------+-----+--------+------+-----+--------+--------+------+---------+------+--------+---------+--------+---------+-----------+------+---------+-----------+----------+------------+------+--------------------+-------+----------+-----------+-------+-------+--------------------+--------------------+------------+--------------------+---------+--------------------+--------+--------------------+------------+--------------------+---------+-------------+----+
|INT_ANY|INT_IDEO|INT_LOG|INT_MISC|                 _id|attacktype1|     attacktype1_txt|         city|claimed|claimmode|

# MongoDB Cluster Connection

In [3]:
# MongoDB URI
mongo_uri = "mongodb+srv://kunal:3EP8YF866TLl3zEX@gtd-cluster.nfp4uid.mongodb.net/gtd"

# Create SparkSession
spark = SparkSession.builder.appName("MongoDB_Spark_Connection")\
        .master("local[*]")\
        .config("spark.mongodb.input.uri", mongo_uri)\
        .config("spark.mongodb.output.uri", mongo_uri)\
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1")\
        .getOrCreate()

# Read data from MongoDB collection
df_cluster = spark.read.format("mongo").option("uri", f"{mongo_uri}.globalterrorismdb").load()

# Show the DataFrame
df_cluster.show()

+-------+--------+-------+--------+--------------------+--------------------+-----------+--------------------+----------+-----------+--------------------+-----------+---------------+-----------+---------------+-------------+------+------+-------+---------+----------+--------------+-------------+---------+--------------------+-----+-----+-------+------------------+-----+-----+-----+--------------+------+---------+------------+--------+--------------------+------+------+--------+---------+-----------+-----------+-----------+--------------+--------------------+----+------+----------+---------+-----+----------+--------------------+-----------+--------------------+--------+-------+------------------+-------+-----------+-------+-----------+-----+--------+----------+------+-----+--------+-------+--------+------+---------+------+--------+--------+--------------------+--------+----------+--------------------+---------+-----------+------+---------+-----------+----------+----------+------------+-

In [4]:
from pyspark.sql.functions import col

# Rename columns
df_cluster = df_cluster.withColumnRenamed('iyear', 'year') \
       .withColumnRenamed('imonth', 'month') \
       .withColumnRenamed('iday', 'day')

# Convert 'approxdate' to string
df_cluster = df_cluster.withColumn('approxdate', col('approxdate').cast('string'))

# Parsing approxdate and creating startdate and enddate column

In [5]:
import re
from datetime import datetime
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType, StructType, StructField

# Define the function to parse dates
def parse_date(date_str):
    if not date_str:
        return None, None
    
    patterns = [
        r'([A-Za-z]+) (\d+)-(\d+), (\d{4})',  # e.g., January 19-20, 1970
        r'([A-Za-z]+) (\d+) - ([A-Za-z]+) (\d+), (\d{4})',  # e.g., May 27 - June 5, 1970
        r'([A-Za-z]+) - ([A-Za-z]+), (\d{4})'  # e.g., April - May, 1980
    ]
    
    for pattern in patterns:
        match = re.match(pattern, date_str)
        if match:
            if len(match.groups()) == 4:
                # Single month date range
                month, start_day, end_day, year = match.groups()
                start_date = datetime.strptime(f"{month} {start_day} {year}", "%B %d %Y").strftime("%Y-%m-%d")
                try:
                    end_date = datetime.strptime(f"{month} {end_day} {year}", "%B %d %Y").strftime("%Y-%m-%d")
                except ValueError:
                    end_date = datetime.strptime(f"{month} {start_day} {year}", "%B %d %Y").replace(day=1, month=int(start_date[5:7])+1).strftime("%Y-%m-%d")
                return start_date, end_date
            elif len(match.groups()) == 5:
                # Multi-month date range
                start_month, start_day, end_month, end_day, year = match.groups()
                start_date = datetime.strptime(f"{start_month} {start_day} {year}", "%B %d %Y").strftime("%Y-%m-%d")
                try:
                    end_date = datetime.strptime(f"{end_month} {end_day} {year}", "%B %d %Y").strftime("%Y-%m-%d")
                except ValueError:
                    end_date = datetime.strptime(f"{end_month} {start_day} {year}", "%B %d %Y").replace(day=1, month=int(start_date[5:7])+1).strftime("%Y-%m-%d")
                return start_date, end_date
            elif len(match.groups()) == 3:
                # Month range without specific days
                start_month, end_month, year = match.groups()
                start_date = datetime.strptime(f"{start_month} 01 {year}", "%B %d %Y").strftime("%Y-%m-%d")
                end_date = datetime.strptime(f"{end_month} 01 {year}", "%B %d %Y")
                end_date = end_date.replace(day=1, month=end_date.month % 12 + 1) - pd.Timedelta(days=1)
                end_date = end_date.strftime("%Y-%m-%d")
                return start_date, end_date
    return None, None

# Register the function as a UDF
parse_date_udf = udf(parse_date, StructType([
    StructField("start_date", StringType(), True),
    StructField("end_date", StringType(), True)
]))

# Apply the UDF to the DataFrame
df_cluster = df_cluster.withColumn("parsed_dates", parse_date_udf(df_cluster['approxdate']))

# Split the parsed dates into separate columns
df_cluster = df_cluster.withColumn("start_date", col("parsed_dates").getItem("start_date")) \
                       .withColumn("end_date", col("parsed_dates").getItem("end_date")) \
                       .drop("parsed_dates")


In [6]:
from pyspark.sql.functions import to_date

# Convert start_date and end_date columns to datetime
df_cluster = df_cluster.withColumn("start_date", to_date(col("start_date"), "yyyy-MM-dd")) \
                       .withColumn("end_date", to_date(col("end_date"), "yyyy-MM-dd"))


# Calculate the average date and store it in the approxdate column

In [7]:
from pyspark.sql.functions import col, datediff, expr, date_add

# Calculate the average date and store it in the approxdate column
df_cluster = df_cluster.withColumn(
    "approxdate",
    date_add(col("start_date"), (datediff(col("end_date"), col("start_date")) / 2).cast("int"))
)


In [8]:
from pyspark.sql.functions import col, dayofmonth, to_date

# Extract the day from the approxdate column and store it in the day column
df_cluster = df_cluster.withColumn(
    "day",
    dayofmonth(to_date(col("approxdate"), "yyyy-MM-dd"))
)

In [9]:
from pyspark.sql.functions import col, udf, when
from pyspark.sql.types import IntegerType, BooleanType
import random

# Define UDF to generate a random day for a given month and year
def random_day_for_month(month, year):
    if month in [1, 3, 5, 7, 8, 10, 12]:  # Months with 31 days
        return random.randint(1, 31)
    elif month in [4, 6, 9, 11]:  # Months with 30 days
        return random.randint(1, 30)
    elif month == 2:
        # Check for leap year
        if (year % 4 == 0 and year % 100 != 0) or (year % 400 == 0):
            return random.randint(1, 29)
        else:
            return random.randint(1, 28)

# Register UDF
random_day_udf = udf(random_day_for_month, IntegerType())

# Define the UDF to set 'pred_date'
def set_pred_date(day):
    return True if day is None or day == 0 else False

# Register the UDF for pred_date
set_pred_date_udf = udf(set_pred_date, BooleanType())

# Create the pred_date column and update day values
df_cluster = df_cluster.withColumn(
    "pred_date",
    when(col("day").isNull() | (col("day") == 0), True).otherwise(False)
)

df_cluster = df_cluster.withColumn(
    "day",
    when(col("pred_date"), random_day_udf(col("month"), col("year"))).otherwise(col("day"))
)


In [10]:
df_cluster.show()

+-------+--------+-------+--------+--------------------+--------------------+-----------+--------------------+----------+-----------+--------------------+-----------+---------------+-----------+---------------+-------------+------+------+-------+---------+----------+--------------+-------------+---------+--------------------+-----+-----+-------+------------------+-----+-----+-----+--------------+------+---------+------------+--------+--------------------+------+------+--------+---------+-----------+-----------+-----------+--------------+--------------------+----+-----+----------+---------+----+----------+--------------------+-----------+--------------------+--------+-------+------------------+-------+-----------+-------+-----------+-----+--------+----------+------+-----+--------+-------+--------+------+---------+------+--------+--------+--------------------+--------+----------+--------------------+---------+-----------+------+---------+-----------+----------+----------+------------+---

In [12]:
from pyspark.sql.functions import col, when, lit

# Drop rows where longitude is either 0 or NaN
df_cluster = df_cluster.filter((col('longitude') != 0) & col('longitude').isNotNull())

# Drop rows where city is either 0 or NaN
df_cluster = df_cluster.filter((col('city') != 0) & col('city').isNotNull())

# Drop rows where multiple is NaN
df_cluster = df_cluster.filter(col('multiple').isNotNull())

# Replace NaN (null) values with 'unknown' for specified columns
columns_with_unknown = [
    'natlty1_txt', 'natlty1', 'natlty2_txt', 'natlty2',
    'natlty3_txt', 'natlty3', 'gsubname', 'gsubname2',
    'gsubname3', 'gname2', 'gname3', 'motive'
]

for column in columns_with_unknown:
    if column in df_cluster.columns:
        df_cluster = df_cluster.fillna({column: 'unknown'})

# Replace NaN (null) values with -1 for specified columns
columns_with_negative_one = [
    'guncertain2', 'guncertain3', 'claimed', 'compclaim',
    'weapsubtype1', 'weaptype2', 'weapsubtype2', 'weaptype3',
    'weapsubtype3', 'nkill', 'nkillus', 'nkillter', 
    'nwound', 'nwoundus', 'nwoundte', 'targsubtype1', 
    'guncertain1', 'claimmode', 'propvalue'
]

for column in columns_with_negative_one:
    if column in df_cluster.columns:
        df_cluster = df_cluster.fillna({column: -1})

# Replace NaN (null) values with 'Unknown' for specified columns
columns_with_unknown_text = [
    'summary', 'targsubtype1_txt', 'corp1', 'target1', 
    'claimmode_txt', 'weapsubtype1_txt'
]

for column in columns_with_unknown_text:
    if column in df_cluster.columns:
        df_cluster = df_cluster.fillna({column: 'Unknown'})

# Replace NaN (null) values with -99 for specified columns
columns_with_negative_ninety_nine = [
    'ishostkid', 'nhostkid', 'nhours', 'ndays', 'ransomamt', 
    'ransomamtus', 'ransompaid', 'ransompaidus', 'nperps', 
    'nperpcap'
]

for column in columns_with_negative_ninety_nine:
    if column in df_cluster.columns:
        df_cluster = df_cluster.fillna({column: -99})

# Special handling for 'nhours' where 999 should be replaced with -99
df_cluster = df_cluster.withColumn('nhours', 
                                   when(col('nhours') == 999, -99).otherwise(col('nhours')))

# Replace NaN values in 'ransom' with 0
df_cluster = df_cluster.fillna({'ransom': 0})

# Fill 'nperps' with 'nperpcap' values if available
df_cluster = df_cluster.withColumn('nperps', 
                                   when(col('nperps').isNull(), col('nperpcap')).otherwise(col('nperps')))

# Ensure NaN values in 'nperps' and 'nperpcap' are replaced with -99
df_cluster = df_cluster.fillna({'nperps': -99, 'nperpcap': -99})


In [14]:
# Drop the specified columns from the PySpark DataFrame
df_cluster = df_cluster.drop(
    'ransomnote', 'scite1', 'scite2', 'scite3', 'start_date', 'attacktype2', 'attacktype2_txt',
    'attacktype3', 'attacktype3_txt', 'targtype2', 'targtype2_txt', 'targsubtype2', 'targsubtype2_txt',
    'corp2', 'natlty2', 'natlty2_txt', 'targtype3', 'targtype3_txt', 'targsubtype3', 'targsubtype3_txt',
    'corp3', 'natlty3', 'natlty3_txt', 'gname2', 'gsubname2', 'gname3', 'gsubname3', 'guncertain2',
    'guncertain3', 'claim2', 'claimmode2', 'claimmode2_txt', 'claim2', 'claimmode2', 'end_date',
    'claimmode2_txt', 'weaptype2', 'weaptype2_txt', 'weapsubtype2', 'weapsubtype2_txt', 'weaptype3',
    'weaptype3_txt', 'weapsubtype3', 'weapsubtype3_txt', 'weaptype4', 'weaptype4_txt', 'weapsubtype4',
    'weapsubtype4_txt', 'nwoundus', 'nkillus', 'nhostkidus', 'divert', 'kidhijcountry', 'ransomnote',
    'addnotes', 'scite1', 'scite2', 'scite3', 'dbsource', 'approxdate', 'location', 'alternative',
    'alternative_txt', 'target2', 'target3', 'claim3', 'claimmode3_txt', 'claimmode3', 'propextent',
    'propextent_txt', 'propcomment'
)


# Writing a dataframe to local mongoDB

In [None]:
mongo_ip = "mongodb://localhost:27017/GTD.gtd_clean_by_spark"
df_cluster.write.format("com.mongodb.spark.sql.DefaultSource")\
            .option("uri", mongo_ip).save()

# Writing a dataframe to mongoDB cluster

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MongoDB Integration") \
    .config("spark.mongodb.output.uri", "mongodb+srv://<username>:<password>@<cluster-address>/<database>.<collection>") \
    .config("spark.mongodb.output.connection.timeout", "60000") \
    .getOrCreate()

# Write DataFrame to MongoDB
df_cluster.write.format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", "mongodb+srv://kunal:3EP8YF866TLl3zEX@gtd-cluster.nfp4uid.mongodb.net/gtd.gtd_clean_by_spark") \
    .mode("overwrite") \
    .save()
