In [1]:
import findspark
findspark.init()

from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.window import Window

import numpy as np
import pandas as pd
from datetime import date, datetime
from dateutil.relativedelta import relativedelta

In [2]:
import sys
import os

#Need to set environment variables in order to make pyspark work in Python3.6
spark_home=os.environ['SPARK_HOME']
os.environ['PYTHONPATH']= spark_home+"/python/lib/py4j-0.10.4-src.zip"
os.environ['PYSPARK_PYTHON']="/dfm0/util/dfm_python/python36/bin/python3.6"
os.environ['PYSPARK_DRIVER_PYTHON']="/dfm0/util/dfm_python/python36/bin/python3.6"

super_dir='/dfm1/lijli06/DFM_SPARK'

In [3]:
spark = SparkSession.builder \
            .master('local[5]') \
            .appName("local_test") \
            .getOrCreate()

In [4]:
tmp_path = "hdfs://dfm-cluster/DFM/sysauditlog/2018/08/01/sysauditlog.avro"
tmp_df = spark.read.format("com.databricks.spark.avro").load(tmp_path)
tmp_df = tmp_df.withColumn('DATELOGGED', F.from_utc_timestamp('DATELOGGED', 'UTC')) \
               .withColumn('DATE', F.to_date('DATELOGGED'))

# Test pandas UDF for datetime object

In [5]:
@F.pandas_udf('string', F.PandasUDFType.SCALAR)
def get_hour_str(s):
    return s.dt.strftime('%H:00:00')
# get_hour_str_udf = F.pandas_udf(get_hour_str, returnType=StringType())

In [6]:
df = tmp_df.select('ORGNAME', 'DATELOGGED').limit(10).toPandas()

In [7]:
get_hour_str.func(df.DATELOGGED)

0    19:00:00
1    19:00:00
2    06:00:00
3    06:00:00
4    00:00:00
5    00:00:00
6    00:00:00
7    19:00:00
8    19:00:00
9    06:00:00
Name: DATELOGGED, dtype: object

In [8]:
tmp_df.select('DATELOGGED').limit(10) \
      .withColumn('HOUR', get_hour_str('DATELOGGED')).show()

+-------------------+--------+
|         DATELOGGED|    HOUR|
+-------------------+--------+
|2018-08-01 19:03:02|19:00:00|
|2018-08-01 19:03:21|19:00:00|
|2018-08-01 06:45:31|06:00:00|
|2018-08-01 06:45:37|06:00:00|
|2018-08-01 00:16:38|00:00:00|
|2018-08-01 00:17:08|00:00:00|
|2018-08-01 00:17:10|00:00:00|
|2018-08-01 19:06:14|19:00:00|
|2018-08-01 19:06:18|19:00:00|
|2018-08-01 06:45:43|06:00:00|
+-------------------+--------+



# Test Pandas UDF for Country (string) Updates

In [9]:
fibin_country = pd.read_csv(os.path.join(super_dir, 'extn_hashes/homer_country_map.csv'),header=0, keep_default_na=False)
fibin_country1 = fibin_country[fibin_country['FIBIN']!='all'].copy()
fibin_country1['ORGNAME'] = fibin_country1['ORGNAME']+'@'+fibin_country1['FIBIN']
fibin_country1 = dict(fibin_country1[['ORGNAME', 'ISSUERCOUNTRY']].values)
fibin_country2 = fibin_country[fibin_country['FIBIN']=='all'].copy()
fibin_country2 = dict(fibin_country2[['ORGNAME', 'ISSUERCOUNTRY']].values)
dicts = {**fibin_country1, **fibin_country2}
def update_issuer_country(c1, c2, dicts=dicts):
    up1 = (c1 + '@' + c2).replace(dicts)
    up2 = c1.replace(dicts)
    up2[up2==c1]=''
    up1[up1.str.contains('@')] = up2[up1.str.contains('@')]

    return up1
update_issuer_country_udf = F.pandas_udf(update_issuer_country, returnType=StringType())

In [10]:
tmp_df.select('ISSUERCOUNTRY').limit(5).show()

+-------------+
|ISSUERCOUNTRY|
+-------------+
|          826|
|          826|
|          826|
|          826|
|          826|
+-------------+



In [11]:
tmp_df.select(update_issuer_country_udf(F.col('ORGNAME'), F.col('FIBIN')).alias('NewCountry')).limit(5).show()

+--------------+
|    NewCountry|
+--------------+
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
+--------------+



# Test Pandas UDF Median on Groupby

### Use pandas_udf

In [12]:
group_field = 'DATE'
pivot_field = 'ORGNAME'
weight_field = 'AMOUNTUSD'
schema = tmp_df.select(group_field, pivot_field, weight_field).schema

@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def median_amt(amt_df):
    group = amt_df[group_field].iloc[0]
    pivot = amt_df[pivot_field].iloc[0]
    return pd.DataFrame([[group, pivot, amt_df[weight_field].median()]], columns=[group_field, pivot_field, weight_field])

In [13]:
df = tmp_df.select(['DATE', 'AMOUNTUSD']).limit(50).toPandas()

In [14]:
df

Unnamed: 0,DATE,AMOUNTUSD
0,2018-08-01,642.004888
1,2018-08-01,0.0
2,2018-08-01,39.73112
3,2018-08-01,314.58434
4,2018-08-01,91.6872
5,2018-08-01,480.0
6,2018-08-01,56.5
7,2018-08-01,100.0224
8,2018-08-01,64.5978
9,2018-08-01,138.92


In [15]:
%timeit tmp_df.select(['DATE', 'ORGNAME', 'AMOUNTUSD']).groupBy([group_field, pivot_field]).apply(median_amt).rdd.map(list).collect()

40.1 s ± 186 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Use percentile_approx

In [16]:
agg_func = F.expr('percentile_approx(AMOUNTUSD, 0.5, 800)')

In [17]:
%timeit tmp_df.select(['DATE', 'ORGNAME', 'AMOUNTUSD']).groupBy([group_field, pivot_field]).agg(agg_func).rdd.map(list).collect()

35.8 s ± 244 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


# Test Pandas UDF for Date Mapping

In [18]:
start_date = date(2018, 8, 1)
def date_map(s, start_date, interval=2, unit='months'):
    if unit.lower()=='days':
        delta = s - start_date        
        interval = delta.astype("timedelta64[D]")//interval*interval
        o = pd.to_datetime(s)
        o[:] = start_date
        o = o + interval.astype("timedelta64[D]")
    elif unit.lower()=='months':
        start_date = start_date + relativedelta(day=1)
        s = pd.to_datetime(s)
        o = s.copy()
        o[:] = start_date
        delta = s.dt.to_period('M') - o.dt.to_period('M')
        interval = delta//interval*interval
        o = o + interval.astype('timedelta64[M]') + pd.offsets.MonthBegin(n=0)
    else:
        start_date = start_date + relativedelta(month=1, day=1)
        s = pd.to_datetime(s)
        o = s.copy()
        o[:] = start_date
        delta = s.dt.to_period('Y') - o.dt.to_period('Y')
        interval = delta//interval*interval
        o = o + interval.astype('timedelta64[Y]') + pd.offsets.YearBegin(n=0)
        
    return o.dt.date

date_map_udf = F.pandas_udf(lambda s, start_date=start_date: date_map(s, start_date), returnType=DateType())

In [19]:
df = pd.DataFrame([[1, date(2018, 8, 4)], [2, date(2019, 11, 10)], [2, date(2022, 12, 21)]], columns=['id', 'date'])
df

Unnamed: 0,id,date
0,1,2018-08-04
1,2,2019-11-10
2,2,2022-12-21


In [20]:
date_map_udf.func(df.date)

0    2018-08-01
1    2019-10-01
2    2022-12-01
Name: date, dtype: object

In [21]:
tmp_df.select('DATELOGGED','DATE').limit(50).withColumn('DATE', date_map_udf('DATE')).show()

+-------------------+----------+
|         DATELOGGED|      DATE|
+-------------------+----------+
|2018-08-01 19:03:02|2018-08-01|
|2018-08-01 19:03:21|2018-08-01|
|2018-08-01 06:45:31|2018-08-01|
|2018-08-01 06:45:37|2018-08-01|
|2018-08-01 00:16:38|2018-08-01|
|2018-08-01 00:17:08|2018-08-01|
|2018-08-01 00:17:10|2018-08-01|
|2018-08-01 19:06:14|2018-08-01|
|2018-08-01 19:06:18|2018-08-01|
|2018-08-01 06:45:43|2018-08-01|
|2018-08-01 06:44:53|2018-08-01|
|2018-08-01 00:17:15|2018-08-01|
|2018-08-01 00:17:22|2018-08-01|
|2018-08-01 19:08:56|2018-08-01|
|2018-08-01 19:09:01|2018-08-01|
|2018-08-01 06:44:55|2018-08-01|
|2018-08-01 06:44:55|2018-08-01|
|2018-08-01 00:17:22|2018-08-01|
|2018-08-01 00:16:30|2018-08-01|
|2018-08-01 19:09:55|2018-08-01|
+-------------------+----------+
only showing top 20 rows



In [22]:
spark.stop()