# ****IMPORTANT NOTE:**

This notebook is for VIEW ONLY. To test and run the notebook, please download, upload and run the zeppelin notebook on Peel: [Cleaning_Dataset_peel_Accuracy.zpln](https://github.com/qyc206/evq_big_data_project/blob/main/notebooks/part2/Cleaning_Dataset_peel_Accuracy.zpln).

## II. Accuracy

During profiling, we used the [uszips.csv](https://drive.google.com/file/d/1qd2cXgTx-h-hRd0C7z2s_U4O8VYLAXA7/view?usp=sharing) dataset to uncover several inaccuracies in the City column of our 311 service dataset. To solve this problem, we use the zipcodes in our dataset to match with those in the [uszips.csv](https://drive.google.com/file/d/1qd2cXgTx-h-hRd0C7z2s_U4O8VYLAXA7/view?usp=sharing) dataset so that we can find the actual city values to replace those that are inaccurate.

Before we start, make sure to download and upload the [uszips.csv](https://drive.google.com/file/d/1qd2cXgTx-h-hRd0C7z2s_U4O8VYLAXA7/view?usp=sharing) dataset into HDFS just as described in the previous "Upload the dataset to Peel cluster & Define dataset path" section. After the reference dataset is downloaded and uploaded into HDFS, run the cells to define the dataset path and make sure the dataset can be read.

## Upload the dataset to Peel cluster & Define dataset path

Before continuing, make sure your dataset is available on Peel HDFS. If your dataset is on your local machine, you can copy them to the login node of the cluster and move them to your user directory in the HDFS using the following commands:

```
# Copy file from local machine to login node of the cluster
mylaptop$ scp -r [FILENAME] <net_id>@peel.hpc.nyu.edu:~/

# Move file from cluster login node to your user directory in HDFS 
# (your file will be in the path "/user/[netid]/[FILENAME]")
hfs -put [FILENAME]
```

Make sure you can locate your dataset before continuing onwards.

In [None]:
%pyspark
# Define path to dataset on Peel HDFS (NOTE: replace file name with your own if different)
dataset_path = "/user/CS-GY-6513/project_data/data-cityofnewyork-us.erm2-nwe9.csv"

## Set up Spark Session

Now that the dataset is uploaded and the path is defined, we need to set up pyspark to begin profiling and exploring our dataset. 

If this notebook is run in an environment where pyspark is not yet installed, please add a new cell BEFORE the next cell and run the following command:

```
# Run this command if pyspark is not already installed
%pip install pyspark
```


In [None]:
%pyspark

# Set up pyspark session
from pyspark.sql import SparkSession

spark = SparkSession \
            .builder \
            .appName("Python Spark SQL basic example") \
            .config("spark.some.config.option", "some-value") \
            .config("spark.executor.memory", "35g") \
            .config("spark.driver.memory", "35g") \
            .getOrCreate()

## Load dataset using spark

Run the following lines to load the dataset using spark and test to make sure that dataset is properly loaded.

In [None]:
%pyspark

# Load dataset
df = spark.read.format('csv').options(header='true',inferschema='true').load(dataset_path)
# (Note: change "311_service_report" to a name that better suits your dataset, if different)
df.createOrReplaceTempView("311_service_report") 

### Generalizing Formatting

For many datasets, to optimally find information about any column that involves time, the column type must be turned into a timestamp type. However, to turn a column type into a timestamp, the data within the column must match the format that is specified when calling the to_timestamp() function ( to_timestamp(dataset[column], format) ). Therefore, it is best to be able to generalize this part of formating to make sure all our date columns are uniforom. This is even more essential since some of our solutions involve dates.

In [None]:
%pyspark

def formatDate(dataset, col, DateForm):
    formatedData = dataset.withColumn(col,to_timestamp(dataset[col],DateForm))
    return formatedData

In [None]:
%pyspark

from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql.functions import to_timestamp

# Type casting to expected types
df = df.withColumn("Unique Key",df["Unique Key"].cast(IntegerType()))
df = formatDate(df,"Due Date","MM/dd/yyyy hh:mm:ss a")
df = formatDate(df,"Created Date","MM/dd/yyyy hh:mm:ss a")
df = formatDate(df,"Closed Date","MM/dd/yyyy hh:mm:ss a")
df = df.withColumn("Incident Zip",df["Incident Zip"].cast(IntegerType()))
df = df.withColumn("BBL",df["BBL"].cast(IntegerType()))
df = df.withColumn("X Coordinate (State Plane)",df["X Coordinate (State Plane)"].cast(IntegerType()))
df = df.withColumn("Y Coordinate (State Plane)",df["Y Coordinate (State Plane)"].cast(IntegerType()))
df = formatDate(df,"Resolution Action Updated Date","MM/dd/yyyy hh:mm:ss a")


# (Note: change "311_service_report" to a name that better suits your dataset, if different)
df.createOrReplaceTempView("311_service_report")

df.printSchema()

In [None]:
%pyspark

# Run to remove cache
df.unpersist()

Now that pyspark is set up and the columns of the dataset are updated to types that we expect, we can start using pyspark to explore and clean the dataset!

In [None]:
%pyspark

# Define path for US zip dataset
# (Note: make sure to update to your netid and dataset name)
uszip_path = "/user/qyc206/uszips.csv"

In [None]:
%pyspark

from pyspark.sql.types import IntegerType, DoubleType

# Read the US zip dataset
us = spark.read.csv(uszip_path, header=True)
us = us.withColumn("zip",us["zip"].cast(IntegerType()))
us = us.withColumn("lat",us["lat"].cast(DoubleType()))
us = us.withColumn("lng",us["lng"].cast(DoubleType()))
us.show()

In [None]:
%pyspark

# Run to remove cache
us.unpersist()

In [None]:
%pyspark

# All the cities in New york with it's zip code
ny_data = (us.filter(us["state_id"].like('NY'))).select(us["zip"],us["city"]) 

In [None]:
%pyspark

# Run to remove cache
ny_data.unpersist()

Run the following cells to solve the inaccuracy problem.

In [None]:
%pyspark

def calculate_distinct(col,dataoriginal,get_option="count"):
    distinct_vals = dataoriginal.select(col).distinct()
    if get_option=="count":
        return distinct_vals.count()
    elif get_option == "distinct":
        return distinct_vals

In [None]:
%pyspark

def find_wrong_vals(correct_dataset,dataset_to_clean,col_in_correct,col_to_be_cleaned):
    """
    Returns a list of wrong values in the dataset given that there is a dataset with all correct values
    """
    col_vals = calculate_distinct(col_in_correct,correct_dataset,"distinct")
    list_of_correct_vals = [row for row in col_vals.collect()]
    vals_in_dataset = calculate_distinct(col_to_be_cleaned,dataset_to_clean,"distinct")
    list_of_vals_in_dataset = [row for row in vals_in_dataset.collect()]
    #Finding the incorrect names
    list_of_wrong_vals = []
    for i in list_of_vals_in_dataset:
        if i not in list_of_correct_vals:
            list_of_wrong_vals.append(i)
    return list_of_correct_vals,list_of_wrong_vals

In [None]:
%pyspark

def levenshteinDistance(s1, s2):
    if len(s1) > len(s2):
        s1, s2 = s2, s1

    distances = range(len(s1) + 1)
    for i2, c2 in enumerate(s2):
        distances_ = [i2+1]
        for i1, c1 in enumerate(s1):
            if c1 == c2:
                distances_.append(distances[i1])
            else:
                distances_.append(1 + min((distances[i1], distances[i1 + 1], distances_[-1])))
        distances = distances_
    return distances[-1]

def clean_col(list_of_correct_vals,list_of_mispellings):
    """
    calculates the levenshtein distance between each of the values of correct_vals and mispellings and finds the one that has minimum distance to be used as the new value
    """
    list_new_values = {}
    for i in list_of_mispellings:
        matched = ""
        min_val = 100000000
        for j in list_of_correct_vals:
            try:
                curr_dist = levenshteinDistance(i[0], j[0])
                if curr_dist<min_val:
                    min_val =curr_dist
                    matched = j[0]
            except:
                continue
        list_new_values[str(i[0])] = str(matched)
    return list_new_values
            

In [None]:
%pyspark

list_of_correct_cities,list_of_wrong_cities = find_wrong_vals(ny_data,df,"city","City")
print("Number of wrong cities before cleaning:"+str(len(list_of_wrong_cities)))

In [None]:
%pyspark

list_of_new_values = clean_col(list_of_correct_cities,list_of_wrong_cities)

Run the code for profiling here so that we can use it to compare to cleaned results to see the improvement.

In [None]:
%pyspark

#This line of code shows you a dictionary of (valuesToBeCorrected:valueToCorrectTo), however, Running this code could cause heap issues when running future lines
list_of_new_values

In [None]:
%pyspark

keys = list(list_of_new_values.keys())
values = list(list_of_new_values.values())
df = df.replace(keys,values,"City")

In [None]:
%pyspark

# Run to remove cache
df.unpersist()

In [None]:
%pyspark

# So that memory does not get overloaded
list_of_new_values.clear()
list_of_correct_cities.clear()
keys.clear()
values.clear()

del list_of_new_values
del list_of_correct_cities
del keys
del values