In [1]:
#!/usr/bin/env python
# coding: utf-8

# --- NOTES -------------------------------------------------------------------
# 1. Update the datasets, dataList
# -----------------------------------------------------------------------------

import os
import re
import csv
import sys
import json
import time
import pyspark
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import regexp_extract, col
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import DoubleType
from copy import deepcopy
from datetime import datetime
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession, Row
from pyspark.sql.functions import udf, unix_timestamp, col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType, TimestampType

In [2]:
 # Input & output directories

    #file_list = "../cluster3.txt"
    file_list = "/home/ted/school/big_data/project/big_data_course_project/cluster3.txt"

    # use these if running locally (Jupyter, etc)
    inputDirectory = "../raw_data/"
    outputDirectory = "../output_data/"#sys.argv[2]
    
    # use these if running on DUMBO
    #inputDirectory = "/user/hm74/NYCColumns/"#sys.argv[1]
    #outputDirectory = "/user/" + this_user + "/project/task1/"#sys.argv[2]

In [3]:
if __name__ == "__main__":
    # Setting spark context and 
    sc = SparkContext()
    spark = SparkSession \
        .builder \
        .appName("project_task1") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

    # Current user path
    env_var = os.environ
    this_user = env_var['USER']

    # Input & output directories. Use these if running on DUMBO
    #inputDirectory = "/user/hm74/NYCOpenData/"#sys.argv[1]
    #outputDirectory = "/user/" + this_user + "/project/task1/"#sys.argv[2]
    
    # use these if running locally (Jupyter, etc)
    inputDirectory = "../raw_data/"
    outputDirectory = "../output_data/"#sys.argv[2]

    # Output JSON Semantic Schema
    semanticSchema = {
        "semantic_type": "",
        "count": 0
    }

    # Define of different types regex dict:
    expr_dic = {"Street": "ROAD|STREET|PLACE|DRIVE|BLVD|%ST%|%RD%|DR|AVENUE",
                "Website" : "WWW.|.COM|HTTP://",
                "BuildingCode" : "([A-Z])\d\-",
                "PhoneBumber":"\d\d\d\d\d\d\d\d\d\d|\(\d\d\d\)\d\d\d\d\d\d\d|\d\d\d\-\d\d\d\-\d\d\d\d",
                "ZipCode":"\d\d\d\d\d|\d\d\d\d\d\-\d\d\d|\d\d\d\d\d\d\d\d",
                "Lat_Lon" : "\([-+]?[0-9]+\.[0-9]+\,\s*[-+]?[0-9]+\.[0-9]+\)",
                "SchoolName" : "SCHOOL|ACADEMY|HS|ACAD|I.S.|IS|M.S.|P.S|PS|YABC",
                }

In [15]:
    # Importing cluster3 format it and put it into a list
    raw_data = sc.textFile("cluster3.txt")
    raw_list = raw_data.flatMap(lambda x: x.split(",")).collect()
    raw_list = [re.sub("\[|\]|\'|\'|" "", "", item)for item in raw_list]
    raw_list = [re.sub(" " "", "", item)for item in raw_list]
    
   
    
    # Iteration over dataframes begins bu using dataframe file names
    processCount = 1

    # Create schema for raw data before reading into df 
    customSchema = StructType([
               StructField("val", StringType(), True),
               StructField("count", IntegerType(), True)])
    j = 0
    #Testing first 50 files
    for filename in raw_list[33:34]:

        print("Processing Dataset =========== : ", str(processCount) + ' - ' +filename)
        df = sqlContext.read.format("csv").option("header",
        "false").option("inferSchema", "true").option("delimiter", 
        "\t").schema(customSchema).load(inputDirectory + filename)

        # Count all val in df with count 
        count_all = df.rdd.map(lambda x: (1,x[1])).reduceByKey(lambda x,y: x + y).collect()[0][1]
        
        
        # note: the following scaler process is taken from:
        # https://stackoverflow.com/questions/40337744/scalenormalise-a-column-in-spark-dataframe-pyspark
        # UDF for converting column type from vector to double type
        unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())
        
        for i in ["count"]:
            # VectorAssembler Transformation - Converting column to vector type
            assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

            # MinMaxScaler Transformation
            scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")

            # Pipeline of VectorAssembler and MinMaxScaler
            pipeline = Pipeline(stages=[assembler, scaler])

            # Fitting pipeline on dataframe
            df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")

        print("After Scaling :", j)
        df.sort(col("count").desc()).show()
        j += 1
    
    
    
    
    
    
    
    
    
    
    

After Scaling : 0
+--------------------+-----+------------+
|                 val|count|count_Scaled|
+--------------------+-----+------------+
|   PASSENGER VEHICLE|63655|         1.0|
|SPORT UTILITY / S...|33161|       0.521|
|               SEDAN| 9457|       0.149|
|STATION WAGON/SPO...| 7792|       0.122|
|                TAXI| 3815|        0.06|
|             UNKNOWN| 3285|       0.052|
|       PICK-UP TRUCK| 2822|       0.044|
|                 VAN| 1570|       0.025|
|               OTHER| 1108|       0.017|
|          MOTORCYCLE|  556|       0.009|
|             BICYCLE|  533|       0.008|
|                 BUS|  518|       0.008|
|SMALL COM VEH(4 T...|  479|       0.008|
|LARGE COM VEH(6 O...|  448|       0.007|
|      LIVERY VEHICLE|  424|       0.007|
|           BOX TRUCK|  177|       0.003|
|                BIKE|   84|       0.001|
|TRACTOR TRUCK DIESEL|   81|       0.001|
|                  TK|   73|       0.001|
|                  BU|   63|       0.001|
+---------------

In [None]:
############################################################
""" END HERE"""
############################################################