In [21]:
spark.stop()
sc.stop()

In [22]:
import findspark
findspark.init()

In [23]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import sys
import string
from csv import reader
from functools import reduce
from pyspark.sql import functions as f
from collections import defaultdict
import datetime

In [24]:
spark = SparkSession \
                .builder \
                .appName("TableCollections") \
                .config("spark.some.config.option", "some-value") \
                .getOrCreate()
sc = spark.sparkContext

In [4]:
parkingTable = spark.read.format('csv').options(header='true',inferschema='true').load('/user/ecc290/HW1data/parking-violations-header.csv')
openTable = spark.read.format('csv').options(header='true',inferschema='true').load('/user/ecc290/HW1data/open-violations-header.csv')

In [25]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import sys
import string
from csv import reader
from functools import reduce
from pyspark.sql import functions as f
from collections import defaultdict
import datetime
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, TimestampType, StringType

class TableCollections:
    def __init__(self,spark,sc):
        self.spark = spark
        self.sc = sc
        self.tableNames = []
        self.fs = self.sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
        
    def add_registered_table_name(self, name):
        self.tableNames.append(name)
        
    def register(self, df, name):
        # Clean up column names so that we can prevent future errors
        for colName, dtype in df.dtypes:
            if '.' in colName or '`' in colName or colName.strip() != colName:
                df = df.withColumnRenamed(colName, colName.strip().replace(".", "").replace("`", ""))

        # track down which tables have been registered to the class
        self.tableNames.append(name)
        numFileName = name + "_num_metadata.csv"
        timeFileName = name + "_time_metadata.csv"
        stringFileName = name + "_string_metadata.csv"
        num_cols, time_cols, string_cols = [], [], []
        df.createOrReplaceTempView(name) # can be problematic

        # put column names into appropriate bin
        for colName, dtype in df.dtypes:
            if dtype == 'timestamp':
                time_cols.append(colName)
            elif dtype == 'string':
                string_cols.append(colName)
            else:
                num_cols.append(colName)

        # For each datatype of columns, process metadata
        if not self.fs.exists(self.sc._jvm.org.apache.hadoop.fs.Path(numFileName)):
            self.createNumMetadata(df, num_cols, numFileName)
        else:
            print("num metadata file exists for table {}".format(name))
        if not self.fs.exists(self.sc._jvm.org.apache.hadoop.fs.Path(timeFileName)):
            self.createTimeMetadata(df, time_cols, timeFileName)
        else:
            print("timestamp metadata file exists for table {}".format(name))

    def createTimeMetadata(self, df, time_cols, timeFileName):
        for colName in time_cols:
            minMax = df.agg(f.min(df[colName]), f.max(df[colName])).collect()[0]
            metaDf = self.sc.parallelize([
                    (colName,minMax[0].strftime("%Y-%m-%d %H:%M:%S"),minMax[1].strftime("%Y-%m-%d %H:%M:%S"))]).toDF(["colName","min","max"])
            metaDf.write.save(path=timeFileName, header="false", format='csv', mode='append', sep = '^')

    def createNumMetadata(self, df, num_cols, numFileName):
        describeTable = df[num_cols].describe().collect()
        
        for colName in num_cols:
            metaDf = self.sc.parallelize([
                     (colName,float(describeTable[3][colName]),float(describeTable[4][colName]))]).toDF(["colName","min","max"])
            metaDf.write.save(path=numFileName, header="false", format='csv', mode='append', sep = '^')

    def timeColWithinRange(self, minTime, maxTime):
        resultCreated = False
        if type(minTime) != datetime.datetime or type(maxTime) != datetime.datetime:
            raise TypeError("minNum, maxNum must be timestamp")
            
        schema = StructType([
            StructField("colName", StringType(), True),
            StructField("min", TimestampType(), True),
            StructField("max", TimestampType(), True)])
        
        for each in self.tableNames:
            filename = each + '_time_metadata.csv'
            if self.fs.exists(self.sc._jvm.org.apache.hadoop.fs.Path(filename)):
                currentTable = spark.read.csv(filename,header=False,schema=schema, sep='^')
                if not resultCreated:
                    resultDf = currentTable.where(currentTable.min>minTime).where(currentTable.max<maxTime).select(currentTable.colName).withColumn("tableName", f.lit(each))
                    resultCreated = True
                else:
                    resultDf = resultDf.union(currentTable.where(currentTable.min>minTime).where(currentTable.max<maxTime).select(currentTable.colName).withColumn("tableName", f.lit(each)))

        return resultDf

    def numColWithinRange(self, minNum, maxNum):
        # int, bigint, float, long
        resultCreated = False
        if type(minNum) == datetime.datetime or \
            type(minNum) == str or \
            type(maxNum) == datetime.datetime or \
            type(maxNum) == str:
            raise TypeError("minNum, maxNum must be number")
            
        schema = StructType([
            StructField("colName", StringType(), True),
            StructField("min", DoubleType(), True),
            StructField("max", DoubleType(), True)])
        
        for each in self.tableNames:
            filename = each + '_num_metadata.csv'
            if self.fs.exists(self.sc._jvm.org.apache.hadoop.fs.Path(filename)):
                currentTable = spark.read.csv(filename,header=False,schema=schema, sep='^')
                if not resultCreated:
                    resultDf = currentTable.where(currentTable.min>minNum).where(currentTable.max<maxNum).select(currentTable.colName).withColumn("tableName", f.lit(each))
                    resultCreated = True
                else:
                    resultDf = resultDf.union(currentTable.where(currentTable.min>minNum).where(currentTable.max<maxNum).select(currentTable.colName).withColumn("tableName",f.lit(each)))
        return resultDf

In [16]:
tc = TableCollections(spark, sc)
tc.register(openTable, "open")
tc.register(parkingTable, "parking")
tc.numColWithinRange(0, 1000000000000).show()
tc.timeColWithinRange(datetime.datetime(1994,1,1), datetime.datetime(2018,5,1)).show()

num metadata file exists for table open
num metadata file exists for table parking
timestamp metadata file exists for table parking
+--------------+---------+
|       colName|tableName|
+--------------+---------+
|summons_number|     open|
|   fine_amount|     open|
|summons_number|  parking|
|violation_code|  parking|
+--------------+---------+

+----------+---------+
|   colName|tableName|
+----------+---------+
|issue_date|  parking|
+----------+---------+



In [None]:
tc = TableCollections(spark, sc)
dirname = '/user/jp4989/open_data_csv/'
with open('data_ids', 'r') as f:
    ids = f.readlines()
    for each in ids:
        filename = dirname+each.strip()+'.csv'
        df = spark.read.format('csv').options(header='true',inferschema='true').load(filename)
        if not df.rdd.isEmpty():
            tc.register(df, 'nyc_'+each.strip().replace('-','_'))