In [1]:
import pyspark.sql.functions as f 

dataframe = spark.sql("select distinct * from consumables.ecm_pdc_lifespan_relevant_features_stage_1 ").cache()

In [2]:
dataframe.select(f.col('MachineState')).distinct().show()

In [3]:
dataframe = dataframe.withColumn('isPrintingState', f.when(f.col('MachineState')=='PRINT_STATE', 1).otherwise(None))

In [4]:
from rulex.contiguous_time_series import get_contiguous_data

group_by_columns = ['press', 'replacement_id']
order_by_columns = ['rawTime','TotalImpressions']
date_column = 'rawTime'
column_names = ['isPrintingState']

#contiguous_printing = get_contiguous_data(dataframe.filter("replacement_id='5000012236'"), column_names, group_by_columns, order_by_columns, date_column, ts_min_days = 0).cache()
contiguous_printing = get_contiguous_data(dataframe, column_names, group_by_columns, order_by_columns, date_column, ts_min_days = 0).cache()

print("Original Dataset:{}\nNew Dataset:{}rows".format( dataframe.count(),contiguous_printing.count()))

### Session Type Analysis

In [6]:
contiguous_printing_agg =  (contiguous_printing
  .groupBy('time_series_id')
  .agg(
    f.count(f.col('time_series_id')).alias("rows_count"),
    (f.max(f.col('rawTime'))-f.min(f.col('rawTime'))).alias("time_delta"),
    (f.max(f.col('TotalImpressions'))-f.min(f.col('TotalImpressions'))).alias("impressions_delta")
  ))

In [7]:
contiguous_printing_agg = contiguous_printing_agg\
                    .withColumn('Session_type', 
                        f.when(f.col('impressions_delta')<30, 'Calibration Session')
                        .when(f.col('impressions_delta')<=150, 'Short Session')
                        .when(f.col('impressions_delta')>1500, 'Long Session')
                        .otherwise("Medium Session"))

display(contiguous_printing_agg.groupBy('Session_type').count())

In [8]:
contiguous_printing_new = contiguous_printing.join(contiguous_printing_agg, on="time_series_id",how='inner')

contiguous_printing_session_type = (contiguous_printing_new
  .groupBy('press','replacement_id','LifeSpan')
  .pivot('Session_type')
  .count())

contiguous_printing_session_type = (contiguous_printing_session_type
  .na.fill(0)
  .withColumn(
    'session_count', f.col('Long Session')+f.col('Medium Session')+f.col('Short Session')
  ))

columns_list = ['Long Session','Medium Session','Short Session']

contiguous_printing_session_type = (contiguous_printing_session_type
  .withColumn('Long Session ratio', f.col('Long Session')/f.col('session_count'))
  .withColumn('Medium Session ratio', f.col('Medium Session')/f.col('session_count'))
  .withColumn('Short Session ratio' , f.col('Short Session')/f.col('session_count'))
  .withColumn('Calibration Session ratio' , f.col('Calibration Session')/f.col('session_count'))
)

display(contiguous_printing_session_type.limit(10))

In [9]:
corr_list = ['Long Session','Medium Session','Short Session','session_count','Long Session ratio','Medium Session ratio','Short Session ratio','Calibration Session ratio']
display(contiguous_printing_session_type.select([f.corr(f.col(column),'LifeSpan').alias(column+'_corr') for column in corr_list]))

## Filtering by Sessions Length, Impressions and time

In [11]:
rows_count = contiguous_printing_agg.approxQuantile('rows_count',[.25,.5,.75],0.001)
time_delta = contiguous_printing_agg.approxQuantile('time_delta',[.25,.5,.75],0.001)
impressions_delta = contiguous_printing_agg.approxQuantile('impressions_delta',[.25,.5,.75],0.001)

print("Rows Count: {0}\n Time Delta: {1}\n Impressions Delta: {2}".format(rows_count,time_delta,impressions_delta))

In [12]:
contiguous_printing_agg.approxQuantile('time_delta',[.25,.5,.75],0.001)

In [13]:
contiguous_printing_agg

In [14]:
contiguous_printing_agg.filter("impressions_delta>1500").count()

In [15]:
display(contiguous_printing_agg.filter("impressions_delta>=1000"))

In [16]:
#contiguous_printing_agg.count()
contiguous_printing_agg_filtered = contiguous_printing_agg.filter("impressions_delta>=1000 and time_delta>100 and rows_count>=30")

In [17]:
display(contiguous_printing_agg_filtered)

In [18]:
contiguous_printing2 = contiguous_printing.join(contiguous_printing_agg, on="time_series_id",how='inner')

In [19]:
#contiguous_printing_agg.coalesce(1).write.csv('mnt/testdev-input-csv/contiguous_imp.csv', 'overwrite')

In [20]:
dataframe.filter("MachineState='PRINT_STATE'").count()

In [21]:
contiguous_printing2 = contiguous_printing2.drop("MachineState","isPrintingState")

In [22]:
distinct_pip = contiguous_printing2.select(['replacement_id','time_series_id','impressions_delta', 'LifeSpan']).distinct()

In [23]:
distinct_pip = distinct_pip.withColumn('Session', f.when(f.col('impressions_delta')<=150, 'Short Session').otherwise("Medium Session"))
distinct_pip = distinct_pip.withColumn('Session', f.when(f.col('impressions_delta')>1500, 'Long Session').otherwise(f.col('Session')))

In [24]:
distinct_pip.count()

In [25]:
distinct_pip.coalesce(1).write.csv('mnt/testdev-input-csv/distinct_pip_imp.csv', 'overwrite')

In [26]:
display(distinct_pip.orderBy('replacement_id','time_series_id'))