In [1]:
import numpy as np
import pandas as pd
import os
import sys
import glob
import time 
import warnings
warnings.filterwarnings("ignore")
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import pyspark
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import current_date
from fbprophet import Prophet
from reduce_mem_usage import reduce_mem_usage
from continuous_data import round_target_column, create_store_test
from office_preprocessing import preprocess_office

##### Metric Evaluation function

In [2]:
import os
os.chdir('../')

In [3]:
new_data = preprocess_office()

In [3]:
data = pd.read_csv('input_files/office_data.csv')
data = reduce_mem_usage(data)


closed_stores = ['Saturn Connect Trier', 'Media Markt Heilbronn 2','Saturn Schweinfurt Schrammstraße', 'Saturn Connect Köln',
                    'Saturn Stuttgart-Hbf', 'Saturn-Berlin Clayallee','Saturn Mönchengladbach - Stresemannstraße', 'Saturn Lübeck',
                    'Saturn München Theresienhöhe', 'Saturn Berlin-Alt-Treptow','Media Markt Turnstraße', 'Saturn Wiesbaden Bahnhofsplatz',
                    'Media Markt Ellwangen - DAS', 'Media Markt GmbH Nürtingen','Media Markt Meppen', 'Media Markt Schleswig', 
                    'Media-Saturn IT Services GmbH', 'Meida Markt Waiblingen', 'Saturn Bergisch Gladbach','Saturn Wesel', 'Saturn Hagen', 
                    'Media Markt Bad Cannstatt', 'Saturn Heidelberg', 'Saturn Hildesheim', 'Saturn Münster am York-Ring', 'Media Markt Köln-Chorweiler',
                    'Saturn Dessau', 'Saturn Essen-Steele','Saturn Euskirchen', 'Saturn Göttingen', 'Saturn Hennef', 'Saturn Herford',
                    'Saturn Düsseldorf','Saturn Itzehoe','Saturn Siegburg','Saturn Weiterstadt', 'Saturn Bremerhaven - BSS', 'Saturn Gelsenkirchen Buer']
data = data.loc[~data['Store_names'].isin(closed_stores)].reset_index(drop=True)


#convert the sales date into datetime format 

data['Sales Date'] = pd.to_datetime(data['Sales Date'])
data = data.drop_duplicates()


#data = (data.set_index("Sales Date").groupby(['Reseller City','Super Division', 'Business Unit', pd.Grouper(freq='W')])["Rslr Sales Quantity", "Rslr Sales Amount"].sum().astype(int).reset_index())
data['black_week'] = np.where((data['Sales Date'].dt.month==11) & (data['Sales Date'].dt.day > 23), 1, 0)

max_date = data['Sales Date'].max()

Mem. usage decreased to 78.10 Mb (19.4% reduction)


In [4]:
max_date

Timestamp('2022-02-26 00:00:00')

In [5]:
import calendar as cal
from datetime import *
from dateutil.relativedelta import *
import holidays

def black_week(year):
    black_friday = (datetime.date(datetime(year, 11, 1) + relativedelta(weekday=FR(+4)) - timedelta(days=4)), datetime.date(datetime(year, 11, 1) + relativedelta(weekday=FR(+4))+ timedelta(days=1)))
    cyber_monday = datetime.date(datetime(year, 11, 1) + relativedelta(weekday=FR(+4))+ timedelta(days=3))

    black_friday = pd.to_datetime(black_friday)
    black_friday =pd.date_range(black_friday[0], black_friday[1], freq ='d')

    cyber_monday = pd.to_datetime(cyber_monday)
    #cyber_monday = list(cyber_monday)


    return black_friday, cyber_monday

In [6]:
# ## create the test_dataframe 

test_data = create_store_test(data, max_date)

test_data['Sales Date'] = pd.to_datetime(test_data['Sales Date'])
test_data['black_week'] = np.where((test_data['Sales Date'].dt.month==11) & (test_data['Sales Date'].dt.day > 23), 1, 0)

data = pd.concat([data, test_data])

data['Sales Date'] = pd.to_datetime(data['Sales Date'])

In [7]:
test_data

Unnamed: 0,Super Division,Product Division,Business Unit,Store_names,Rslr Sales Amount,Sales Date,Rslr Sales Quantity,Reseller City,Reseller Postal Code,black_week
0,EDG Managed - Office License and HP,M365 Consumer,M365,Saturn Köln-Weiden,0,2022-02-27,0,Köln,50858,0
1,EDG Managed - Office License and HP,Office Standard,Office Standard,Saturn Köln-Weiden,0,2022-02-27,0,Köln,50858,0
2,EDG Managed - Office License and HP,M365 Consumer,M365,Fil. Media Markt Würzburg City,0,2022-02-27,0,Würzburg,97070,0
3,EDG Managed - Office License and HP,Office Standard,Office Standard,Fil. Media Markt Würzburg City,0,2022-02-27,0,Würzburg,97070,0
4,EDG Managed - Office License and HP,M365 Consumer,M365,MM Stade,0,2022-02-27,0,Stade,21682,0
...,...,...,...,...,...,...,...,...,...,...
791,EDG Managed - Office License and HP,Office Standard,Office Standard,Saturn Wolfsburg City Galerie,0,2022-03-31,0,Wolfsburg,38440,0
792,EDG Managed - Office License and HP,M365 Consumer,M365,Saturn Wuppertal-Elberfeld,0,2022-03-31,0,Wuppertal,42103,0
793,EDG Managed - Office License and HP,Office Standard,Office Standard,Saturn Wuppertal-Elberfeld,0,2022-03-31,0,Wuppertal,42103,0
794,EDG Managed - Office License and HP,M365 Consumer,M365,Saturn Zwickau,0,2022-03-31,0,Zwickau,8056,0


In [8]:
data = (data.set_index("Sales Date").groupby(['Store_names','Reseller City','Super Division', 'Product Division','Business Unit', pd.Grouper(freq='M')])["Rslr Sales Quantity", "Rslr Sales Amount"].sum().astype(int).reset_index())

In [9]:
data.head()

Unnamed: 0,Store_names,Reseller City,Super Division,Product Division,Business Unit,Sales Date,Rslr Sales Quantity,Rslr Sales Amount
0,Saturn Köln-Weiden,Köln,EDG Managed - Office License and HP,M365 Consumer,M365,2017-01-31,23,1247
1,Saturn Köln-Weiden,Köln,EDG Managed - Office License and HP,M365 Consumer,M365,2017-02-28,11,620
2,Saturn Köln-Weiden,Köln,EDG Managed - Office License and HP,M365 Consumer,M365,2017-03-31,16,906
3,Saturn Köln-Weiden,Köln,EDG Managed - Office License and HP,M365 Consumer,M365,2017-04-30,10,508
4,Saturn Köln-Weiden,Köln,EDG Managed - Office License and HP,M365 Consumer,M365,2017-05-31,10,530


In [10]:
data['black_week'] = np.where(data['Sales Date'].dt.month==11, 1, 0)

In [11]:
data['Business Unit'].unique()

array(['M365', 'Office Standard'], dtype=object)

In [12]:
import plotly.graph_objs as go
def plot_fig(forecast):
    
    forecast = (forecast.set_index("Sales Date").groupby(['Super Division', 'Product Division','Business Unit', pd.Grouper(freq='M')])["Rslr Sales Quantity", "Rslr Sales Amount"].sum().astype(int).reset_index())
    forecast1 = forecast.loc[forecast['Business Unit']=='M365']
    forecast2 = forecast.loc[forecast['Business Unit']=='Office Standard']
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=forecast1['Sales Date'], y=forecast1['Rslr Sales Quantity'], name='M365',))
    fig.add_trace(go.Scatter(x=forecast2['Sales Date'], y=forecast2['Rslr Sales Quantity'], name='Office Standard',))
    #fig.add_trace(go.Scatter(x=forecast['ds'], y=forecast['promos'], name='promos',))
    fig.show()

In [13]:
plot_fig(data)

In [14]:
data['black_week'] = np.where((data['Business Unit']=='M365') & (data['Sales Date'].dt.month==11) & (data['Sales Date'].dt.year==2017), 1, 0)
data['black_week'] = np.where((data['Business Unit']=='Office Standard') & (data['Sales Date'].dt.month==11) & (data['Sales Date'].dt.year==2019|2020), 1, data['black_week'])
data['christmas'] = np.where((data['Business Unit']=='M365') & (data['Sales Date'].dt.month==12) & (data['Sales Date'].dt.year==2017), 1, 0)
data['christmas'] = np.where((data['Business Unit']=='M365') & (data['Sales Date'].dt.month==12) & (data['Sales Date'].dt.year==2019), 2, data['christmas'])
data['christmas'] = np.where((data['Business Unit']=='M365') & (data['Sales Date'].dt.month==3) & (data['Sales Date'].dt.year==2021), 1, data['christmas'])
data['christmas'] = np.where((data['Business Unit']=='Office Standard') & (data['Sales Date'].dt.month==12) & (data['Sales Date'].dt.year==2017|2018|2019), 1, data['christmas'])
data['christmas'] = np.where((data['Business Unit']=='Office Standard') & (data['Sales Date'].dt.month==9) & (data['Sales Date'].dt.year==2019), 1, data['christmas'])
data['christmas'] = np.where((data['Business Unit']=='Office Standard') & (data['Sales Date'].dt.month==9) & (data['Sales Date'].dt.year==2021), 2, data['christmas'])
data['christmas'] = np.where((data['Business Unit']=='Office Standard') & (data['Sales Date'].dt.month==4) & (data['Sales Date'].dt.year==2019), 1, data['christmas'])
data['christmas'] = np.where((data['Business Unit']=='Office Standard') & (data['Sales Date'].dt.month==7|9) & (data['Sales Date'].dt.year==2019), 1, data['christmas'])



In [24]:
data.tail(10)

Unnamed: 0,Store_names,Reseller City,Super Division,Product Division,Business Unit,Sales Date,Rslr Sales Quantity,Rslr Sales Amount,black_week,christmas
48543,Saturn Zwickau,Zwickau,EDG Managed - Office License and HP,Office Standard,Office Standard,2021-04-30,5,504,0,0
48544,Saturn Zwickau,Zwickau,EDG Managed - Office License and HP,Office Standard,Office Standard,2021-05-31,8,906,0,0
48545,Saturn Zwickau,Zwickau,EDG Managed - Office License and HP,Office Standard,Office Standard,2021-06-30,8,808,0,0
48546,Saturn Zwickau,Zwickau,EDG Managed - Office License and HP,Office Standard,Office Standard,2021-07-31,11,1208,0,0
48547,Saturn Zwickau,Zwickau,EDG Managed - Office License and HP,Office Standard,Office Standard,2021-08-31,10,1009,0,0
48548,Saturn Zwickau,Zwickau,EDG Managed - Office License and HP,Office Standard,Office Standard,2021-09-30,21,2121,0,2
48549,Saturn Zwickau,Zwickau,EDG Managed - Office License and HP,Office Standard,Office Standard,2021-10-31,24,2410,0,0
48550,Saturn Zwickau,Zwickau,EDG Managed - Office License and HP,Office Standard,Office Standard,2022-01-31,3,299,0,0
48551,Saturn Zwickau,Zwickau,EDG Managed - Office License and HP,Office Standard,Office Standard,2022-02-28,19,2093,0,0
48552,Saturn Zwickau,Zwickau,EDG Managed - Office License and HP,Office Standard,Office Standard,2022-03-31,0,0,0,0


In [17]:
data.isna().sum()

Store_names            0
Reseller City          0
Super Division         0
Product Division       0
Business Unit          0
Sales Date             0
Rslr Sales Quantity    0
Rslr Sales Amount      0
black_week             0
christmas              0
dtype: int64

In [18]:
data.nunique()

Store_names             398
Reseller City           264
Super Division            1
Product Division          2
Business Unit             2
Sales Date               61
Rslr Sales Quantity     389
Rslr Sales Amount      6484
black_week                2
christmas                 3
dtype: int64

In [19]:
import time 
spark = SparkSession.builder\
        .appName('office_forecasting') \
        .master('local[*]') \
        .config('spark.sql.execution.arrow.pyspark.enabled', True) \
        .config('spark.sql.execution.arrow.enabled', True) \
        .getOrCreate()
        # .config('spark.sql.session.timeZone', 'UTC') \
        # .config('spark.executor.memory','16G') \
        # .config('spark.driver.memory','16G') \
        # .config("spark.driver.maxResultSize", "4G") \
        #.getOrCreate()
         #.config('spark.sql.repl.eagerEval.enabled', True) \
         #.config('spark.ui.showConsoleProgress', True) \





In [20]:
start_time = time.time()
sdf = spark.createDataFrame(data)
print('%0.2f min: Lags' % ((time.time() - start_time) / 60))
sdf = sdf.withColumnRenamed('Sales Date', 'ds')\
        .withColumnRenamed('Super Division', 'Super_Division')\
        .withColumnRenamed('Product Division', 'Product_Division')\
        .withColumnRenamed('Rslr Sales Quantity', 'Rslr_Sales_Qunatity')\
        .withColumnRenamed('Reseller City', 'Reseller_City')\
        .withColumnRenamed('Business Unit', 'Business_Unit')

sdf.printSchema()

sdf.createOrReplaceTempView('sales')

0.04 min: Lags
root
 |-- Store_names: string (nullable = true)
 |-- Reseller_City: string (nullable = true)
 |-- Super_Division: string (nullable = true)
 |-- Product_Division: string (nullable = true)
 |-- Business_Unit: string (nullable = true)
 |-- ds: timestamp (nullable = true)
 |-- Rslr_Sales_Qunatity: integer (nullable = true)
 |-- Rslr Sales Amount: integer (nullable = true)
 |-- black_week: integer (nullable = true)
 |-- christmas: integer (nullable = true)



In [21]:


spark.sql("select Store_names, Reseller_City,  Business_Unit, count(*) from sales group by Store_names, Reseller_City, Business_Unit order by Reseller_City, Business_Unit").show()

sql = 'SELECT Store_names, Reseller_City, Super_Division, Product_Division, Business_Unit, black_week, christmas, ds, sum(Rslr_Sales_Qunatity) as y FROM sales GROUP BY Store_names, Reseller_City, Super_Division, Product_Division, Business_Unit, black_week, christmas, ds ORDER BY Store_names, Reseller_City,  Super_Division,  Business_Unit, ds'

sdf.explain()


+--------------------+-------------+---------------+--------+
|         Store_names|Reseller_City|  Business_Unit|count(1)|
+--------------------+-------------+---------------+--------+
|  Media Markt Aachen|       Aachen|           M365|      61|
|       Saturn Aachen|       Aachen|           M365|      61|
|  Media Markt Aachen|       Aachen|Office Standard|      61|
|       Saturn Aachen|       Aachen|Office Standard|      61|
|   Media Markt Aalen|        Aalen|           M365|      61|
|   Media Markt Aalen|        Aalen|Office Standard|      61|
|Media Markt Berli...|  Ahrensfelde|           M365|      61|
|Media Markt Berli...|  Ahrensfelde|Office Standard|      61|
|Media Markt Albstadt|     Albstadt|           M365|      61|
|Media Markt Albstadt|     Albstadt|Office Standard|      61|
|   Media Markt Alzey|        Alzey|           M365|      61|
|   Media Markt Alzey|        Alzey|Office Standard|      61|
|  Media Markt Amberg|       Amberg|           M365|      61|
|  Media

In [22]:
sdf.rdd.getNumPartitions()

8

In [23]:
### make partitions of data based on number of cores in the local system
spark.sql(sql).show()

store_part = (spark.sql(sql).repartition(spark.sparkContext.defaultParallelism, ['Store_names','Business_Unit'])).cache()
store_part.explain()

+-------------------+-------------+--------------------+----------------+-------------+----------+---------+-------------------+---+
|        Store_names|Reseller_City|      Super_Division|Product_Division|Business_Unit|black_week|christmas|                 ds|  y|
+-------------------+-------------+--------------------+----------------+-------------+----------+---------+-------------------+---+
| Saturn Köln-Weiden|         Köln|EDG Managed - Off...|   M365 Consumer|         M365|         0|        0|2017-01-31 00:00:00| 23|
| Saturn Köln-Weiden|         Köln|EDG Managed - Off...|   M365 Consumer|         M365|         0|        0|2017-02-28 00:00:00| 11|
| Saturn Köln-Weiden|         Köln|EDG Managed - Off...|   M365 Consumer|         M365|         0|        0|2017-03-31 00:00:00| 16|
| Saturn Köln-Weiden|         Köln|EDG Managed - Off...|   M365 Consumer|         M365|         0|        0|2017-04-30 00:00:00| 10|
| Saturn Köln-Weiden|         Köln|EDG Managed - Off...|   M365 Consu

In [25]:
### make the resultant schema
result_schema =StructType([
    StructField('ds',TimestampType()),
    StructField('Store_names',StringType()),
    StructField('Reseller_City',StringType()),
    StructField('Super_Division',StringType()),
    StructField('Product_Division',StringType()),
    StructField('Business_Unit',StringType()),
    StructField('y',DoubleType()),
    StructField('yhat',DoubleType()),
    StructField('yhat_upper',DoubleType()),
    StructField('yhat_lower',DoubleType())
])

In [26]:
### create the holiday dataframe 

lockdown1 = pd.date_range('2020-03-22', '2020-05-03', freq ='m').to_list()
lockdown2 = pd.date_range('2020-12-13', '2021-03-07', freq ='m').to_list()
lockdown = lockdown1+lockdown2


lock_down = pd.DataFrame({
    'holiday': 'lock_down',
    'ds' : pd.to_datetime(lockdown)})


# christmas = ['2018-12-24', '2018-12-30', '2019-12-22', '2019-12-29',
#             '2020-12-21','2020-12-28', '2021-12-24','2021-12-31']


# christmas = pd.DataFrame({
#     'holiday': 'christmas',
#     'ds' : pd.to_datetime(christmas)})



# holiday = pd.concat([lock_down, christmas])

In [28]:
##### city-wise prophet function 
@pandas_udf( result_schema, PandasUDFType.GROUPED_MAP )
def forecast_sales( store_pd):
    
    model = Prophet(interval_width=0.95, holidays = lock_down)
    model.add_country_holidays(country_name='DE')
    model.add_regressor('black_week')
    model.add_regressor('christmas')

    black_week = dict(zip(store_pd['ds'], store_pd['black_week']))
    christmas = dict(zip(store_pd['ds'], store_pd['christmas']))
    
    train = store_pd[store_pd['ds']<='2022-02-28']
    future_pd = store_pd[store_pd['ds']>'2022-02-28'].set_index('ds')

    train['date_index'] = train['ds']
    train['date_index'] = pd.to_datetime(train['date_index'])
    train = train.set_index('date_index')


    model.fit(train[['ds', 'y', 'black_week', 'christmas']])
    future = model.make_future_dataframe(periods=1, freq='m')
    future['black_week'] = future['ds'].map(black_week)
    future['christmas'] = future['ds'].map(christmas)

    #future['black_week'] = future['ds'].apply(black_week)
    #future['promos'] = future['ds'].apply(promos)

    forecast_pd = model.predict(future[['ds', 'black_week', 'christmas']])  


    #forecast_pd = model.predict(future_pd[['ds', 'black_week', 'EOL_Promos']])  

    f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds')

    #store_pd = store_pd.filter(store_pd['ds']<'2021-10-01 00:00:00')
    st_pd = store_pd[[ 'ds', 'Store_names', 'Reseller_City', 'Super_Division', 'Product_Division', 'Business_Unit','y']].set_index('ds')
    #st_pd = pd.concat([st_pd1, st_pd2])

    results_pd = f_pd.join( st_pd, how='left' )
    results_pd.reset_index(level=0, inplace=True)

    #results_pd[['Reseller_City','Business_Unit']] = future_pd[['Reseller_City','Business_Unit']].iloc[0]

    return results_pd[ ['ds', 'Store_names', 'Reseller_City', 'Super_Division', 'Product_Division', 'Business_Unit','y', 'yhat', 'yhat_upper', 'yhat_lower'] ]

In [29]:
#### run the spark by store-wise
import time
results = (
    store_part
    .groupBy(['Store_names','Business_Unit'])
    .apply(forecast_sales)
    .withColumn('training_date', current_date() )
    )

### cache the results 
start_time = time.time()
results.cache()
print('%0.2f min: Lags' % ((time.time() - start_time) / 60))

results.explain()
results = results.coalesce(1)

### convert the result from sparkdataframe to panadas datafrme 
start_time = time.time()
final_df = results.toPandas()

print('%0.2f min: Lags' % ((time.time() - start_time) / 60))

0.00 min: Lags
== Physical Plan ==
InMemoryTableScan [ds#368, Store_names#369, Reseller_City#370, Super_Division#371, Product_Division#372, Business_Unit#373, y#374, yhat#375, yhat_upper#376, yhat_lower#377, training_date#388]
   +- InMemoryRelation [ds#368, Store_names#369, Reseller_City#370, Super_Division#371, Product_Division#372, Business_Unit#373, y#374, yhat#375, yhat_upper#376, yhat_lower#377, training_date#388], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(2) Project [ds#368, Store_names#369, Reseller_City#370, Super_Division#371, Product_Division#372, Business_Unit#373, y#374, yhat#375, yhat_upper#376, yhat_lower#377, 2022-03-04 AS training_date#388]
            +- FlatMapGroupsInPandas [Store_names#0, Business_Unit#75], forecast_sales(Store_names#0, Reseller_City#64, Super_Division#31, Product_Division#42, Business_Unit#75, black_week#8, christmas#9, ds#20, y#162L), [ds#368, Store_names#369, Reseller_City#370, Super_Division#371, Product_Division#372, B

In [33]:
final_df['yhat'] = np.where(final_df['yhat']<0, 0, final_df['yhat'])
final_df['yhat_upper'] = np.where(final_df['yhat_upper']<0, 0, final_df['yhat_upper'])
final_df['yhat_lower'] = np.where(final_df['yhat_lower']<0, 0, final_df['yhat_lower'])

In [37]:
test_results = final_df.loc[final_df['ds']>='2022-03-31']

In [38]:
test_results.head()

Unnamed: 0,ds,Store_names,Reseller_City,Super_Division,Product_Division,Business_Unit,y,yhat,yhat_upper,yhat_lower,training_date
60,2022-03-31,Fil. Media Markt Würzburg City,Würzburg,EDG Managed - Office License and HP,M365 Consumer,M365,0.0,14.54,26.62,2.04,2022-03-04
121,2022-03-31,Media Markt Backnang,Backnang,EDG Managed - Office License and HP,M365 Consumer,M365,0.0,12.97,25.83,0.16,2022-03-04
182,2022-03-31,Media Markt Bad Dürrheim,Bad Dürrheim,EDG Managed - Office License and HP,Office Standard,Office Standard,0.0,51.61,78.23,23.59,2022-03-04
243,2022-03-31,Media Markt Bayreuth,Bayreuth,EDG Managed - Office License and HP,Office Standard,Office Standard,0.0,9.67,24.44,0.0,2022-03-04
304,2022-03-31,Media Markt Berlin-Charlottenburg,Berlin,EDG Managed - Office License and HP,M365 Consumer,M365,0.0,30.11,51.92,10.8,2022-03-04


In [39]:
from sklearn.metrics import mean_absolute_error
rmse_pred = mean_absolute_error(test_results['y'], test_results['yhat'])
    
print("Root Mean Absolute Error_store:" , np.sqrt(rmse_pred))

Root Mean Absolute Error_store: 4.90451356389608


In [40]:
normalize_rmse = np.sqrt(rmse_pred)/(test_results['y'].max()-test_results['y'].min())
print("Normalize RMSE:" , normalize_rmse)

Normalize RMSE: inf


In [41]:
final_df = final_df.drop_duplicates()
final_df.to_csv(r'output_files/office_store_historic.csv', index=False)

predicted_data = final_df.loc[final_df['ds']> '2022-02-28']

predicted_data = round_target_column(predicted_data, 'yhat')
predicted_data = round_target_column(predicted_data, 'yhat_lower')
predicted_data = round_target_column(predicted_data, 'yhat_upper')

predicted_data.to_csv(r'output_files/office_store_monthly.csv', index=False)

In [42]:
stores=final_df['Store_names'].unique()

In [43]:
final_df['Business_Unit'].unique()

array(['M365', 'Office Standard'], dtype=object)

In [44]:
stores = final_df['Store_names'].unique()

In [45]:
import plotly.graph_objs as go
def plot_fig(df, store, item):
    fig = go.Figure()
    # Create and style traces
    forecast = df.loc[df['Store_names'] == store]
    print(store, item)
    forecast = forecast.loc[forecast['Business_Unit'] == item]
    fig.add_trace(go.Scatter(x=forecast['ds'], y=forecast['y'], name='Actual',))
    fig.add_trace(go.Scatter(x=forecast['ds'], y=forecast['yhat'], name='Predicted',))
    #fig.add_trace(go.Scatter(x=forecast['ds'], y=forecast['yhat_lower'], name='lowelimit',))
    fig.add_trace(go.Scatter(x=forecast['ds'], y=forecast['yhat_upper'], name='upperlimit',))
    #fig.add_trace(go.Scatter(x=forecast['ds'], y=forecast['promos'], name='promos',))
    fig.show()

In [46]:
plot_fig(final_df, stores[300], 'M365')

Saturn Münster 2 Arkaden M365


In [39]:
from lightgbm import LGBMRegressor
import lightgbm as lgb
import joblib
from sklearn.model_selection import RandomizedSearchCV
from sklearn.model_selection import TimeSeriesSplit
import os, sys, gc, time, warnings, pickle, psutil, random



### make the resultant schema
final_schema =StructType([
    StructField('ds',TimestampType()),
    StructField('Store_names',StringType()),
    StructField('Business_Unit',StringType()),
    StructField('black_week',DoubleType()),
    StructField('y',DoubleType()),
    StructField('y_prediction',DoubleType())
])


@pandas_udf( final_schema, PandasUDFType.GROUPED_MAP )
def PA_effect_per_store(store_data):

    lockdown1 = pd.date_range('2020-03-22', '2020-05-04', freq ='m')
    lockdown2 = pd.date_range('2020-11-02', '2021-01-10', freq ='m')


    def conditions(data):
        if (data['Sales Date'] in lockdown1) or (data['Sales Date'] in  lockdown2):
            return 0
        else:
            return 1

    store_data['Store_names'] = store_data['Store_names'].astype('category')
    store_data['Business_Unit'] = store_data['Business_Unit'].astype('category')

    store_data['Year'] = store_data['Sales Date'].dt.year
    store_data['Month'] = store_data['Sales Date'].dt.month
    store_data['corona_lockdown'] = store_data.apply(conditions, axis=1)



    lgb_params = {'boosting_type': ['gbdt'],
                  'objective': ['tweedie'],
                  'tweedie_variance_power': [1.1],
                  'n_estimators': [500],
                  'metric': ['rmse'],
                  'max_depth': [30, 50, 70],
                  'num_leaves': [250, 500, 1000],
                  'learning_rate': [0.01, 0.1],
                  'feature_fraction': [0.5, 0.7],
                 'bagging_fraction': [0.5, 0.7],}

    reg = lgb.LGBMRegressor()


    ## Set n_iter_search to be a higher value for actual hyperparameter tuning
    ## This is just to show the workflow
    
    ## define a 3-fold time-series split for cross validation

    tscv = TimeSeriesSplit(n_splits=5)

    #n_iter_search = 1

    random_search = RandomizedSearchCV(reg,
                                    param_distributions=lgb_params,
                                    n_iter=10,
                                    cv=tscv,
                                    scoring='neg_root_mean_squared_error',
                                    n_jobs=-1,
                                    )


    ## Seeder
    
    def seed_everything(seed=0):
        random.seed(seed)
        np.random.seed(seed)

    VER = 1                          
    SEED = 42                        
    seed_everything(SEED)                   
    N_CORES = psutil.cpu_count()     


    #LIMITS and const
    TARGET      = 'y'                        
    END_TRAIN   = '2021-12-31'     
    

    features_columns = ['ds', 'Store_names', 'Business_Unit', 'Year', 'Month', 'black_week','corona_lockdown']


    train_mask = store_data['ds']<= END_TRAIN
    valid_mask = train_mask&(store_data['ds']> '2021-11-30')
    preds_mask = store_data['ds']>END_TRAIN

    train_x = store_data.loc[train_mask][features_columns].set_index('ds')
    train_y = store_data.loc[train_mask][TARGET]

    valid_x = store_data.loc[valid_mask][features_columns].set_index('ds')
    valid_y = store_data.loc[valid_mask][TARGET]

    predict_data = store_data.loc[preds_mask][features_columns].set_index('ds')
    predict_x = store_data.loc[preds_mask][features_columns]



    #seed_everything(SEED)
    random_search.fit(train_x,
                    train_y,
                    eval_metric='rmse',
                    eval_set=[(train_x, train_y), (valid_x, valid_y)],
                    verbose = 1)
    reg_model = random_search.best_estimator_

    prediction = reg_model.predict(predict_x.set_index('ds'))
    predict_data['y_prediction'] = prediction
    predict_data = predict_data.reset_index()

    return predict_data[['ds', 'Store_names', 'Business_Unit', 'black_week', 'y', 'y_prediction']]

In [40]:
import time
final_results = (
    store_part
    .groupBy(['Store_names','Business_Unit'])
    .apply(PA_effect_per_store)
    .withColumn('training_date', current_date() )
    )

### cache the results 
start_time = time.time()
final_results.cache()
print('%0.2f min: Lags' % ((time.time() - start_time) / 60))

final_results.explain()
final_results = final_results.coalesce(1)

### convert the result from sparkdataframe to panadas datafrme 
start_time = time.time()
final_data = final_results.toPandas()

print('%0.2f min: Lags' % ((time.time() - start_time) / 60))

0.00 min: Lags
== Physical Plan ==
InMemoryTableScan [ds#1962, Store_names#1963, Business_Unit#1964, black_week#1965, y#1966, y_prediction#1967, training_date#1974]
   +- InMemoryRelation [ds#1962, Store_names#1963, Business_Unit#1964, black_week#1965, y#1966, y_prediction#1967, training_date#1974], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(2) Project [ds#1479, Store_names#1480, Business_Unit#1481, black_week#1482, y#1483, y_prediction#1484, 2022-01-31 AS training_date#1491]
            +- FlatMapGroupsInPandas [Store_names#0, Business_Unit#68], PA_effect_per_store(Store_names#0, Reseller_City#58, Super_Division#28, Product_Division#38, Business_Unit#68, black_week#8, ds#18, y#149L), [ds#1479, Store_names#1480, Business_Unit#1481, black_week#1482, y#1483, y_prediction#1484]
               +- *(1) Sort [Store_names#0 ASC NULLS FIRST, Business_Unit#68 ASC NULLS FIRST], false, 0
                  +- InMemoryTableScan [Store_names#0, Business_Unit#68, Store_names#0

Py4JJavaError: An error occurred while calling o270.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:97)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:93)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 630) (Grogu.profiflitzer.local executor driver): java.net.SocketException: Connection reset
	at java.net.SocketInputStream.read(Unknown Source)
	at java.net.SocketInputStream.read(Unknown Source)
	at java.io.BufferedInputStream.fill(Unknown Source)
	at java.io.BufferedInputStream.read(Unknown Source)
	at java.io.DataInputStream.readInt(Unknown Source)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:88)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:50)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:118)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1481)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.CoalescedRDD.$anonfun$compute$1(CoalescedRDD.scala:99)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.hasNext(ArrowConverters.scala:99)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.foreach(ArrowConverters.scala:97)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.to(ArrowConverters.scala:97)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.toBuffer(ArrowConverters.scala:97)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.toArray(ArrowConverters.scala:97)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$6(Dataset.scala:3650)
	at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2308)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2309)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$5(Dataset.scala:3648)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3652)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3629)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1(Dataset.scala:3629)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1$adapted(Dataset.scala:3628)
	at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$2(SocketAuthServer.scala:139)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1(SocketAuthServer.scala:141)
	at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1$adapted(SocketAuthServer.scala:136)
	at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:113)
	at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:107)
	at org.apache.spark.security.SocketAuthServer$$anon$1.$anonfun$run$4(SocketAuthServer.scala:68)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:68)
Caused by: java.net.SocketException: Connection reset
	at java.net.SocketInputStream.read(Unknown Source)
	at java.net.SocketInputStream.read(Unknown Source)
	at java.io.BufferedInputStream.fill(Unknown Source)
	at java.io.BufferedInputStream.read(Unknown Source)
	at java.io.DataInputStream.readInt(Unknown Source)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:88)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:50)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:118)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1481)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.CoalescedRDD.$anonfun$compute$1(CoalescedRDD.scala:99)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.hasNext(ArrowConverters.scala:99)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.foreach(ArrowConverters.scala:97)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.to(ArrowConverters.scala:97)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.toBuffer(ArrowConverters.scala:97)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.toArray(ArrowConverters.scala:97)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$6(Dataset.scala:3650)
	at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2308)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
