http://spark.apache.org/

In [None]:
from pprint import pprint
import datetime
import pandas as pd
import numpy as np

In [None]:
#!pip install pyspark

In [None]:
#!pip install pyarrow

In [None]:
from pyspark.sql import SparkSession, Row
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
session_name = "sales_data"

spark = SparkSession.builder.appName(session_name).getOrCreate()

spark

In [None]:
N = 35_000

current_date = datetime.date.today()
current_time = datetime.datetime.now()

pval = [.05,.25,.3,.05,.1,.1,.1,.05]

np.random.RandomState(seed=7)

df = pd.DataFrame.from_dict(
    {
        'trans_dates':np.random.choice(np.arange('2010-01-01', '2021-01-01', dtype='datetime64[D]'), size=N),
        'LOCATIONS':np.random.choice(['CA','TX','NY','OH', 'FL'], size=(N,), p=[.1,.2,.3,.35,.05]),
        'employees':np.random.choice(['Aiden', 'Leslie', 'Ian', 'Harlan', 'Maeve', 'Kendra', 'Allen', 'Jo'], size=(N,), p=pval),        
        'Sales/Hrs':np.random.choice(np.arange(1, 17, 1), size=N),
        'sales tot$':np.random.normal(loc=1000, scale=20, size=N),
        'doc_date':current_date.strftime("%m/%d/%Y"),
    })

df['actuals'] = np.where(df['trans_dates'] <= current_date.strftime("%Y-%m-%d"), True, False)

df.info()

In [None]:
df.head()

In [None]:
df['sales tot$'].sum()

In [None]:
def clean_cols(x=str):
    x = x.lower().replace(",","").replace(",","").replace(":","_").replace("$","").replace("-","").replace("/","_")
    x = x.replace(" ","_")
    x = x.replace("___","_")
    x = x.replace("__","_")
    return x

In [None]:
df.columns

In [None]:
df.columns = list(map(clean_cols, df.columns))
df.columns

In [None]:
sql_types = {
    'object':'string',
    'int64':'integer',
    'int32':'integer',
    'int16':'integer',
    'float64':'double',
    'float32':'double',
    'datetime64[ns]':'date'
}

for i, x in enumerate(df.dtypes):
    col_name = df.dtypes.index[i]
    col_type = str(df.dtypes.values[i])
    
    if 'date' in col_name:
        print(col_name, 'date', end='')
    else:
        print(col_name, sql_types.get(col_type, 'string'), end='')
    print(',')

In [None]:
df.to_parquet("sales_data")

In [None]:
ds = spark.read.parquet("sales_data") 

In [None]:
ds.printSchema()

In [None]:
ds.show(truncate=False)

In [None]:
ds.dtypes

In [None]:
ds.columns

In [None]:
ds = ds.withColumnRenamed('sales_tot', 'sales_total')

In [None]:
ds = ds.withColumn('doc_date', to_timestamp(col('doc_date'), "m/d/yyyy"))
ds.show()

In [None]:
current_date = datetime.datetime.now()
current_date

In [None]:
ds = ds.withColumn('current_date', lit(current_date))
ds.show(10, False)

In [None]:
ds = ds.withColumn('sales_total', round(col('sales_total') ,2))
ds.show(10, False)

In [None]:
ds.printSchema()

In [None]:
ds.printSchema()

In [None]:
def emp_sum(e):
    if e == "Maeve": return 'Group1'
    elif e == "Harlan": return 'Group2'
    else: return 'Group3'
    
emp_sum = udf(emp_sum, StringType())

In [None]:
ds = ds.withColumn('employee_summary', emp_sum('employees'))

In [None]:
ds.dtypes

In [None]:
ds.count()

In [None]:
ds.show(10, False)

In [None]:
ds.sample(False, fraction=0.1).show()

In [None]:
ds.sort('trans_dates').show()

In [None]:
ds.sort('trans_dates').show()

In [None]:
ds.filter(ds['locations']=='CA').show()

In [None]:
ds.groupBy('employee_summary').agg({'sales_total':'sum'}).show()

In [None]:
groups = ['locations']
aggs = ['sales_total']

func = [sum, mean, max, min]

expr = [f(col(c)) for f in func for c in aggs]

ds.groupBy(*groups).agg(*expr).orderBy('locations').show()

In [None]:
ds.groupBy('locations').pivot('employees').agg(sum('sales_total')).orderBy('locations').show()

In [None]:
ds.filter(ds['employees']=='Aiden').groupBy('locations').pivot('actuals').agg(sum('sales_total')).orderBy('locations').show()

In [None]:
sc = spark.sparkContext

In [None]:

locations = (
    'AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA', 'HI',
    'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD', 'MA', 'MI',
    'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ', 'NM', 'NY', 'NC',
    'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC', 'SD', 'TN', 'TX', 'UT',
    'VT', 'VA', 'WA', 'WV', 'WI', 'WY', 'PR', 'GU', 'VI', 'AS', 'MP',
    'DC'
)

stname = (
    'Alabama', 'Alaska', 'Arizona', 'Arkansas', 'California', 'Colorado', 'Connecticut',
    'Delaware', 'Florida', 'Georgia', 'Hawaii', 'Idaho', 'Illinois', 'Indiana', 'Iowa', 'Kansas',
    'Kentucky', 'Louisiana',  'Maine', 'Maryland', 'Massachusetts', 'Michigan', 'Minnesota',
    'Mississippi', 'Missouri', 'Montana', 'Nebraska', 'Nevada', 'New Hampshire', 'New Jersey',
    'New Mexico', 'New York', 'North Carolina', 'North Dakota', 'Ohio', 'Oklahoma', 'Oregon',
    'Pennsylvania', 'Rhode Island', 'South Carolina', 'South Dakota', 'Tennessee', 'Texas',
    'Utah', 'Vermont', 'Virginia', 'Washington', 'West Virginia', 'Wisconsin', 'Wyoming',
    'Puerto Rico', 'Guam', 'U.S. Virgin Islands', 'American Samoa', 'Northern Mariana Islands',
    'District of Columbia'
)

region = (
    'South', 'West', 'West', 'South', 'West', 'West', 'Northeast', 'South', 'South', 'South',
    'West', 'West', 'Midwest', 'Midwest', 'Midwest', 'Midwest', 'South', 'South', 'Northeast',
    'South', 'Northeast', 'Midwest', 'Midwest', 'South', 'Midwest', 'West', 'Midwest', 'West',
    'Northeast', 'Northeast', 'West', 'Northeast', 'South', 'Midwest', 'Midwest', 'South',
    'West', 'Northeast', 'Northeast', 'South', 'Midwest', 'South', 'South', 'West', 'Northeast',
    'South', 'West', 'South', 'Midwest', 'West', 'South', 'South', 'South', 'South', 'South',
    'South'
)

dt = sc.parallelize([Row(locations=col[0], stname=col[1], region=col[2]) for col in list(zip(locations,stname,region))]).toDF()

dt.show()

In [None]:
dt.where("region == 'South'").select('locations', 'stname').show()

In [None]:
dj = ds.join(dt, on='locations', how='left')
dj.show()

In [None]:
dj.createOrReplaceTempView("sales_data")

In [None]:
spark.sql("SELECT * FROM sales_data LIMIT 10").show()

In [None]:
spark.sql("SELECT count(*) AS total_count, round(sum(sales_total)/1000) AS total_sales_k FROM sales_data").show()

In [None]:
spark.sql("SELECT region, round(sum(sales_total)/1000) AS sales_k FROM sales_data GROUP BY region ORDER BY 2 DESC").show()

In [None]:
pdf = spark.sql("SELECT region, stname, employees, sales_hrs, sales_total FROM sales_data WHERE region IN ('Midwest', 'West')").toPandas()
pdf.head()

In [None]:
pdf['region'].value_counts(dropna=False)