In [1]:
import os
import pandas as pd
import numpy as np

import joblib
from joblib import Parallel, delayed
from tqdm import tqdm
import dask
import dask.dataframe as dd

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]")\
     .config("spark.executor.memory", "700g")\
     .config("spark.driver.memory", "500g")\
     .config("spark.memory.offHeap.enabled",True)\
     .config("spark.memory.offHeap.size","256g").appName('sachin').getOrCreate()

from functools import reduce 
from pyspark.sql import DataFrame
from pyspark.sql.functions import sum, col, desc, countDistinct, isnan, when, count, isnull, coalesce, substring
from pyspark.sql.types import *
import pyspark.sql.functions as F

In [3]:
os.chdir('../data/savepoints')

In [6]:
df = spark.read.options(delimiter=',').csv(r"{0}\\{1}".format(os.getcwd(), 'trips_merged_clean.csv'), header=True, inferSchema=True)
df.count(), len(df.columns)

(62334388, 19)

In [7]:
df.printSchema()

root
 |-- tripduration: double (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- stoptime: timestamp (nullable = true)
 |-- start station id: string (nullable = true)
 |-- start station name: string (nullable = true)
 |-- start station latitude: double (nullable = true)
 |-- start station longitude: double (nullable = true)
 |-- start_neighborhood: string (nullable = true)
 |-- start_borough: string (nullable = true)
 |-- end station id: string (nullable = true)
 |-- end station name: string (nullable = true)
 |-- end station latitude: double (nullable = true)
 |-- end station longitude: double (nullable = true)
 |-- end_neighborhood: string (nullable = true)
 |-- end_borough: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- month: integer (nullable = true)
 |-- month_year: timestamp (nullable = true)
 |-- year: integer (nullable = true)



In [8]:
df.show(truncate=False)


+------------+-----------------------+-----------------------+----------------+-------------------------------+----------------------+-----------------------+------------------------------------------+-------------+--------------+----------------------------------+--------------------+---------------------+------------------------------------------+-----------+-------------------+-----+-------------------+----+
|tripduration|starttime              |stoptime               |start station id|start station name             |start station latitude|start station longitude|start_neighborhood                        |start_borough|end station id|end station name                  |end station latitude|end station longitude|end_neighborhood                          |end_borough|date               |month|month_year         |year|
+------------+-----------------------+-----------------------+----------------+-------------------------------+----------------------+-----------------------+------------

In [10]:
df.na.drop("any").show()

+------------+--------------------+--------------------+----------------+--------------------+----------------------+-----------------------+--------------------+-------------+--------------+--------------------+--------------------+---------------------+--------------------+-----------+-------------------+-----+-------------------+----+
|tripduration|           starttime|            stoptime|start station id|  start station name|start station latitude|start station longitude|  start_neighborhood|start_borough|end station id|    end station name|end station latitude|end station longitude|    end_neighborhood|end_borough|               date|month|         month_year|year|
+------------+--------------------+--------------------+----------------+--------------------+----------------------+-----------------------+--------------------+-------------+--------------+--------------------+--------------------+---------------------+--------------------+-----------+-------------------+-----+------

In [12]:
df.printSchema()

root
 |-- tripduration: double (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- stoptime: timestamp (nullable = true)
 |-- start station id: string (nullable = true)
 |-- start station name: string (nullable = true)
 |-- start station latitude: double (nullable = true)
 |-- start station longitude: double (nullable = true)
 |-- start_neighborhood: string (nullable = true)
 |-- start_borough: string (nullable = true)
 |-- end station id: string (nullable = true)
 |-- end station name: string (nullable = true)
 |-- end station latitude: double (nullable = true)
 |-- end station longitude: double (nullable = true)
 |-- end_neighborhood: string (nullable = true)
 |-- end_borough: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- month: integer (nullable = true)
 |-- month_year: timestamp (nullable = true)
 |-- year: integer (nullable = true)



In [19]:
df.filter(col("start station id").like("%4138%")).show(1000)

+------------+--------------------+--------------------+----------------+--------------------+----------------------+-----------------------+------------------+-------------+--------------+--------------------+--------------------+---------------------+--------------------+-----------+-------------------+-----+-------------------+----+
|tripduration|           starttime|            stoptime|start station id|  start station name|start station latitude|start station longitude|start_neighborhood|start_borough|end station id|    end station name|end station latitude|end station longitude|    end_neighborhood|end_borough|               date|month|         month_year|year|
+------------+--------------------+--------------------+----------------+--------------------+----------------------+-----------------------+------------------+-------------+--------------+--------------------+--------------------+---------------------+--------------------+-----------+-------------------+-----+------------

In [None]:
df2.to_csv('../savepoints/trips_reporting_struct2_clean.csv', index=False)
df.to_csv('../savepoints/trips_merged_clean.csv', index=False)
# del a, ax, borough_name, boroughs, dc, dff2, fig, lat, lon, most_common_names,neighborhood_name, nta, point, num_cores, q05, q95, results, x1, y1, x2, y2
df.info(verbose=True, show_counts=True)
df['start station id'].value_counts()
df['end station id'].value_counts()
df[['start station id', 'end station id']].value_counts()

ddf = dd.from_pandas(df, npartitions=24)  # set the number of partitions to 24
grouped = ddf.groupby(['start station id', 'date']).nunique().compute()
counts = grouped.groupby('start_station_id').count()
counts