In [1]:
!pip install pymongo
!pip install pyspark
!pip install pandas psycopg2
!pip install sqlalchemy
!pip install psycopg2-binary



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import lit
import pymongo
from pymongo import MongoClient
import pandas as pd
from sqlalchemy import create_engine
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col, expr, trim, when

In [3]:
client = MongoClient('localhost', 27017)
db = client['LAPD']
collection = db['Call_Data']

In [4]:
# Fetch data and convert _id to string
data = list(collection.find())
for doc in data:
    doc['_id'] = str(doc['_id'])

In [5]:
# Convert to DataFrame
df = pd.DataFrame(data)

In [6]:
# Display the dataframe
print(df.head(5))

                        _id   Incident_Number     Area_Occ  Rpt_Dist  \
0  668727f70dfcff0bb9b75798   LPD190704000047  N Hollywood    1532.0   
1  668727f70dfcff0bb9b75799   LPD190529006334      Outside       NaN   
2  668727f70dfcff0bb9b7579a   LPD190218001024    Southwest     379.0   
3  668727f70dfcff0bb9b7579b  PD19120600001346    Northeast    1109.0   
4  668727f70dfcff0bb9b7579c   LPD190216004881     Van Nuys     933.0   

            Dispatch_Date Dispatch_Time Call_Type_Code Call_Type_Text  
0  07/04/2019 12:00:00 AM      00:14:49           507F      FIREWORKS  
1  05/29/2019 12:00:00 AM      21:28:53              6         CODE 6  
2  02/18/2019 12:00:00 AM      07:46:54           507C   CONSTRUCTION  
3  12/06/2019 12:00:00 AM      08:32:27              6         CODE 6  
4  02/16/2019 12:00:00 AM      21:14:49           507P          PARTY  


In [7]:
# Increase Spark memory limit
spark = SparkSession.builder \
    .appName("Pandas to PySpark") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

In [8]:
# Convert the pandas DataFrame to a PySpark DataFrame
call_data_df = spark.createDataFrame(df)

In [10]:
call_data_df.show(4)

+--------------------+----------------+-----------+--------+--------------------+-------------+--------------+--------------+
|                 _id| Incident_Number|   Area_Occ|Rpt_Dist|       Dispatch_Date|Dispatch_Time|Call_Type_Code|Call_Type_Text|
+--------------------+----------------+-----------+--------+--------------------+-------------+--------------+--------------+
|668727f70dfcff0bb...| LPD190704000047|N Hollywood|  1532.0|07/04/2019 12:00:...|     00:14:49|          507F|     FIREWORKS|
|668727f70dfcff0bb...| LPD190529006334|    Outside|     NaN|05/29/2019 12:00:...|     21:28:53|             6|        CODE 6|
|668727f70dfcff0bb...| LPD190218001024|  Southwest|   379.0|02/18/2019 12:00:...|     07:46:54|          507C|  CONSTRUCTION|
|668727f70dfcff0bb...|PD19120600001346|  Northeast|  1109.0|12/06/2019 12:00:...|     08:32:27|             6|        CODE 6|
+--------------------+----------------+-----------+--------+--------------------+-------------+--------------+--------

In [12]:
# Transform the Dispatch_Date column to remove the time part
call_for_service_df = call_data_df.withColumn(
    "Dispatch_Date",
    regexp_replace("Dispatch_Date", r"\s\d{2}:\d{2}:\d{2}\s[APM]{2}", "")
)


In [13]:
# Replace null values in the Rpt_Dist column with NaN
call_for_service_df = call_for_service_df.na.fill({"Rpt_Dist": "NaN"})

# Show the transformed DataFrame
call_for_service_df.show()

+--------------------+----------------+-----------+--------+-------------+-------------+--------------+--------------------+
|                 _id| Incident_Number|   Area_Occ|Rpt_Dist|Dispatch_Date|Dispatch_Time|Call_Type_Code|      Call_Type_Text|
+--------------------+----------------+-----------+--------+-------------+-------------+--------------+--------------------+
|668727f70dfcff0bb...| LPD190704000047|N Hollywood|  1532.0|   07/04/2019|     00:14:49|          507F|           FIREWORKS|
|668727f70dfcff0bb...| LPD190529006334|    Outside|     NaN|   05/29/2019|     21:28:53|             6|              CODE 6|
|668727f70dfcff0bb...| LPD190218001024|  Southwest|   379.0|   02/18/2019|     07:46:54|          507C|        CONSTRUCTION|
|668727f70dfcff0bb...|PD19120600001346|  Northeast|  1109.0|   12/06/2019|     08:32:27|             6|              CODE 6|
|668727f70dfcff0bb...| LPD190216004881|   Van Nuys|   933.0|   02/16/2019|     21:14:49|          507P|               PARTY|


In [17]:
# Remove the specified columns
columns_to_remove = ['_id']
call_for_service_df = call_for_service_df.drop(*columns_to_remove)
# Show the transformed DataFrame
call_for_service_df.show()

+----------------+-----------+--------+-------------+-------------+--------------+--------------------+
| Incident_Number|   Area_Occ|Rpt_Dist|Dispatch_Date|Dispatch_Time|Call_Type_Code|      Call_Type_Text|
+----------------+-----------+--------+-------------+-------------+--------------+--------------------+
| LPD190704000047|N Hollywood|  1532.0|   07/04/2019|     00:14:49|          507F|           FIREWORKS|
| LPD190529006334|    Outside|     NaN|   05/29/2019|     21:28:53|             6|              CODE 6|
| LPD190218001024|  Southwest|   379.0|   02/18/2019|     07:46:54|          507C|        CONSTRUCTION|
|PD19120600001346|  Northeast|  1109.0|   12/06/2019|     08:32:27|             6|              CODE 6|
| LPD190216004881|   Van Nuys|   933.0|   02/16/2019|     21:14:49|          507P|               PARTY|
|PD19122100000089|  Southwest|   392.0|   12/21/2019|     00:21:03|             6|              CODE 6|
| LPD190719001920|    Outside|     NaN|   07/19/2019|     09:56:

In [18]:
# Define the output path where you want to save the CSV file locally
output_path = r"C:\Users\16395\Downloads\final project\bigdatasource\LAPD\mongodb_pyspark_transformation\output.csv"

In [20]:
# Reduce the number of partitions to 1 to create a single CSV file
call_for_service_df = call_for_service_df.coalesce(1)

In [23]:
# Save the DataFrame to a single CSV file on your local machine
call_for_service_df.write.csv(output_path, mode="overwrite", header=True)