EXPLORATORY DATA ANALYSIS

PROJECT: data-engineer-assignment

DATA: The squirrel census

TABLES: park-data.csv and squirrel-data.csv

In [2]:
#import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, when, to_date, date_format, concat, lit, ArrayType, StringType
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import udf
import pytz
from datetime import datetime
import pandas as pd
import spacy
import re


In [3]:
#path to the data and files
path = 'data/'
files = ['park-data.csv', 'squirrel-data.csv']

#create a sapark session and save it in DFs
spark = SparkSession.builder.appName("ReadingApp").getOrCreate()

df_pdata = spark.read.option("header", "true").csv(path+files[0]) #data park df
df_sdata = spark.read.option("header", "true").csv(path+files[1]) #data squirrel df

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/21 20:30:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## PARK DATA

In [4]:
#first view of the parks data, number of rows and schema
print('total number of rows: {}'.format(df_pdata.count()))
df_pdata.show(10)
df_pdata.printSchema()


total number of rows: 25
+-----------------+-------+--------------------+-------+------+----------+----------+-------------------------------------+--------------------+----------------------+--------------------+---------------------+
|        Area Name|Area ID|           Park Name|Park ID|  Date|Start Time|  End Time|Total Time (in minutes, if available)|     Park Conditions|Other Animal Sightings|              Litter|Temperature & Weather|
+-----------------+-------+--------------------+-------+------+----------+----------+-------------------------------------+--------------------+----------------------+--------------------+---------------------+
|  UPPER MANHATTAN|      A|     Fort Tryon Park|      1|3/1/20|3:14:00 PM|4:05:00 PM|                                   51|                Busy|  Humans, Dogs, Pig...|                Some|    43 degrees, sunny|
|  UPPER MANHATTAN|      A| J. Hood Wright Park|      2|3/1/20|3:30:00 PM|4:00:00 PM|                                   30|        

In [5]:
df_pdata.describe().show()

24/07/21 20:13:26 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+---------------+-------+--------------------+-----------------+------+----------+----------+-------------------------------------+-----------------+----------------------+--------------------+---------------------+
|summary|      Area Name|Area ID|           Park Name|          Park ID|  Date|Start Time|  End Time|Total Time (in minutes, if available)|  Park Conditions|Other Animal Sightings|              Litter|Temperature & Weather|
+-------+---------------+-------+--------------------+-----------------+------+----------+----------+-------------------------------------+-----------------+----------------------+--------------------+---------------------+
|  count|             25|     25|                  25|               25|    25|        25|        25|                                   25|               23|                    24|                  19|                   23|
|   mean|           NULL|   NULL|                NULL|           12.532|  NULL|      NULL|      NULL|   

In [6]:
col_names = df_pdata.columns
print(col_names)


['Area Name', 'Area ID', 'Park Name', 'Park ID', 'Date', 'Start Time', 'End Time', 'Total Time (in minutes, if available)', 'Park Conditions', 'Other Animal Sightings', 'Litter', 'Temperature & Weather']


In [10]:
for element in col_names:
    #for row in df_pdata.select(element). distinct().collect()
    distinct_values = df_pdata.select(element).distinct().rdd.flatMap(lambda x: x).collect()    
    print(f"Unique elements of '{element}':")
    print(distinct_values)
    print()

Unique elements of 'Area Name':
['CENTRAL MANHATTAN', 'UPPER MANHATTAN', 'BROOKLYN', 'LOWER MANHATTAN']

Unique elements of 'Area ID':
['B', 'D', 'C', 'A']

Unique elements of 'Park Name':
['City Hall Park', 'Riverside Park (section near Grant Memorial)', 'Corlears Hook Park', 'Union Square Park', 'Marcus Garvey Park', 'Sara D. Roosevelt Park (Section Below Delancey St)', 'Washington Square Park', 'Sternberg Park', 'Sara D. Roosevelt Park (Section Above Delancey St)', 'McCarren Park', 'Thomas Paine Park', 'Stuyvesant Square Park', 'Tompkins Square Park', 'Fort Tryon Park', 'St. Nicholas Park', 'Seward Park', 'John V. Lindsay East River Park', 'Msgr. McGolrick Park', 'Cooper Park', 'J. Hood Wright Park', 'Battery Park', 'Columbus Park', 'Madison Square Park', 'Highbridge Park', 'Teardrop Park']

Unique elements of 'Park ID':
['7', '15', '11', '3', '8', '22', '16', '5', '18', '17', '6', '19', '23', '9', '24', '13.2', '1', '20', '10', '4', '12', '14', '21', '2', '13.1']

Unique elements o

In [10]:
#THERE ARE MORE THAN ONE "Sara D. Roosevelt Park" with IDs 13.1 and 13.2, I will treat them as different parks, so IDs can not be integer
filtered_park = df_pdata.filter((col("Park Name") == "Sara D. Roosevelt Park (Section Above Delancey St)") | (col('Park Name') == "Sara D. Roosevelt Park (Section Below Delancey St)"))
filtered_park.show()

+---------------+-------+--------------------+-------+------+----------+----------+-------------------------------------+---------------+----------------------+------+---------------------+
|      Area Name|Area ID|           Park Name|Park ID|  Date|Start Time|  End Time|Total Time (in minutes, if available)|Park Conditions|Other Animal Sightings|Litter|Temperature & Weather|
+---------------+-------+--------------------+-------+------+----------+----------+-------------------------------------+---------------+----------------------+------+---------------------+
|LOWER MANHATTAN|      C|Sara D. Roosevelt...|   13.1|3/1/20|3:30:00 PM|4:00:00 PM|                                   30|           Busy|   Humans, Dogs (Gray)|  Some|    44 degrees, sunny|
|LOWER MANHATTAN|      C|Sara D. Roosevelt...|   13.2|3/1/20|3:30:00 PM|4:00:00 PM|                                   30|           Busy|       Humans, Pigeons|  Some|    43 degrees, sunny|
+---------------+-------+--------------------+----

In [20]:
conditons = set()
name_column = df_pdata.select("Park Conditions").collect()

# Iterate over the 'name' column
for row in name_column:
    condition = extract_condition(row["Park Conditions"])
    if condition:
        conditons.update(condition)
    

file_path = 'auxiliary_files/condition_list.txt'

# Save the list to a text file
with open(file_path, 'w') as file:
    for item in list(conditons):
        file.write(f"{item}\n")

print(f"List saved to {file_path}")

List saved to condition_list.txt


## SQUIREL DATA

In [4]:
print('total number of rows: {}'.format(df_sdata.count()))
df_sdata.show(10)
df_sdata.printSchema()

total number of rows: 433
+-------+-----------+-----------------+-----------------------+-----------+------------+-----------------------------+-----------------+--------------------+------------------------+---------------------------+-----------------------------+-------------------------------+
|Park ID|Squirrel ID|Primary Fur Color|Highlights in Fur Color|Color Notes|    Location|Above Ground (Height in Feet)|Specific Location|          Activities|Interactions with Humans|Other Notes or Observations|Squirrel Latitude (DD.DDDDDD)|Squirrel Longitude (-DD.DDDDDD)|
+-------+-----------+-----------------+-----------------------+-----------+------------+-----------------------------+-----------------+--------------------+------------------------+---------------------------+-----------------------------+-------------------------------+
|      1|    A-01-01|             Gray|                  White|       NULL|Ground Plane|                         NULL|             NULL|            Foragin

24/07/21 20:31:05 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [6]:
col_names = df_sdata.columns
print(col_names)

['Park ID', 'Squirrel ID', 'Primary Fur Color', 'Highlights in Fur Color', 'Color Notes', 'Location', 'Above Ground (Height in Feet)', 'Specific Location', 'Activities', 'Interactions with Humans', 'Other Notes or Observations', 'Squirrel Latitude (DD.DDDDDD)', 'Squirrel Longitude (-DD.DDDDDD)']


In [11]:
for element in col_names[:-2]:
    #for row in df_pdata.select(element). distinct().collect()
    distinct_values = df_sdata.select(element).distinct().rdd.flatMap(lambda x: x).collect()    
    print(f"Unique elements of '{element}':")
    print(distinct_values)
    print()

Unique elements of 'Park ID':
['7', '15', '11', '3', '8', '22', '16', '5', '18', '6', '19', '9', '1', '20', '10', '4', '12', '14', '21', '2']

Unique elements of 'Squirrel ID':
['B-11-34', 'C-19-14', 'C-20-19', 'A-04-01', 'A-05-05', 'A-02-20', 'B-10-11', 'D-22-42', 'B-09-02', 'B-11-54', 'C-20-03', 'D-22-33', 'C-20-15', 'D-22-05', 'D-22-32', 'C-20-13', 'B-07-05', 'C-19-10', 'C-20-16', 'A-04-12', 'B-11-35', 'A-05-07', 'A-06-15', 'B-10-02', 'B-12-11', 'C-14-03', 'C-19-01', 'B-11-45', 'C-20-21', 'D-22-04', 'B-11-26', 'B-11-28', 'A-03-11', 'D-22-06', 'D-22-41', 'A-02-09', 'B-08-06', 'A-03-07', 'B-10-40', 'C-20-24', 'A-02-13', 'A-04-05', 'C-14-07', 'A-05-10', 'C-19-05', 'B-11-31', 'C-20-07', 'C-20-11', 'D-22-22', 'B-09-07', 'B-10-07', 'A-03-14', 'A-06-03', 'B-11-05', 'D-22-07', 'A-02-18', 'B-10-18', 'B-11-32', 'B-07-01', 'B-11-51', 'A-05-04', 'A-06-17', 'B-10-47', 'C-19-16', 'B-10-51', 'B-09-03', 'B-10-42', 'C-14-04', 'D-22-31', 'A-05-20', 'A-06-01', 'A-06-05', 'B-07-07', 'B-10-48', 'B-12-10

In [12]:
def verb_extractor(activities_str):
    if activities_str:
        act_output = re.sub(r'\[.*?\]', '', activities_str)
        act_output = re.sub(r'\(.*?\)', '', act_output)
        act_output = re.sub(r'\{.*?\}', '', act_output)
        act_output = act_output.replace('/', ' ')
        act_output = act_output.replace("'", 'g')
        act_output = re.sub(r'[^\w\s]', '', act_output)
        act_output = act_output.lower()
        
        nlp = spacy.load('en_core_web_sm')
        
        doc = nlp(act_output)
        
        verbs_set = set()
        
        for token in doc:
            if token.pos_ == "VERB":
                verbs_set.add(token.text)
        #verbs = [token.text for token in doc if token.pos_ == "VERB"]
        if len(verbs_set) < 1 :
            return None
        else:
            return verbs_set
        
        
verb_set= set()
name_column = df_sdata.select("Activities").collect()

# Iterate over the 'name' column
for row in name_column:
    verbs = verb_extractor(row["Activities"])
    if verbs:
        verb_set.update(verbs)
    

file_path = 'auxiliary_files/verb_list.txt'

# Save the list to a text file
with open(file_path, 'w') as file:
    for item in list(verb_set):
        file.write(f"{item}\n")

print(f"List saved to {file_path}")