In [0]:
train_rf_df = spark.read.format("csv") \
                .option("header", True) \
                .option("inferSchema", True) \
                .load("abfss://rawdata@udaystorage.dfs.core.windows.net/trends/df_train4.csv")





In [0]:
x_test = spark.read.format("csv") \
                .option("header", True) \
                .option("inferSchema", True) \
                .load("abfss://rawdata@udaystorage.dfs.core.windows.net/trends/df_test.csv")


In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier as RF
from pyspark.ml.regression import RandomForestRegressor as RF_reg
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler, SQLTransformer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import numpy as np
import functools
from pyspark.ml.feature import OneHotEncoder
#import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
from pyspark.ml.evaluation import RegressionEvaluator


In [0]:
# prepare labeled sets    Train
cols_now = ['user_total_orders', 'user_total_items', 'total_distinct_items',
       'user_average_days_between_orders', 'user_average_basket',
       'order_hour_of_day', 'days_since_prior_order', 'days_since_ratio',
       'aisle_id', 'department_id', 'product_orders', 'product_reorders',
       'product_reorder_rate', 'UP_orders', 'UP_orders_ratio',
       'UP_average_pos_in_cart', 'UP_reorder_rate', 'UP_orders_since_last',
       'UP_delta_hour_vs_last']
assembler_features = VectorAssembler(inputCols=cols_now, outputCol='features')
label_indexer = StringIndexer(inputCol='label', outputCol='label_double')
pipeline_train = Pipeline(stages=[assembler_features, label_indexer])

In [0]:
# prepare labeled sets     x_test
cols_now = ['user_total_orders', 'user_total_items', 'total_distinct_items',
       'user_average_days_between_orders', 'user_average_basket',
       'order_hour_of_day', 'days_since_prior_order', 'days_since_ratio',
       'aisle_id', 'department_id', 'product_orders', 'product_reorders',
       'product_reorder_rate', 'UP_orders', 'UP_orders_ratio',
       'UP_average_pos_in_cart', 'UP_reorder_rate', 'UP_orders_since_last',
       'UP_delta_hour_vs_last']
assembler_features = VectorAssembler(inputCols=cols_now, outputCol='features')
pipeline_test = Pipeline(stages=[assembler_features])

In [0]:
trainData = pipeline_train.fit(train_rf_df).transform(train_rf_df)

testData = pipeline_test.fit(x_test).transform(x_test)

In [0]:
#Grid search
rf = RF(labelCol='label_double', featuresCol='features')
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [1, 2]) \
    .addGrid(rf.maxDepth, [2]) \
    .addGrid(rf.maxBins, [2]) \
    .build()
pipeline = Pipeline([rf])

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=4)  # use 3+ folds in practice


Method __init__ forces keyword arguments.
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 103, in wrapper
    raise TypeError("Method %s forces keyword arguments." % func.__name__)
TypeError: Method __init__ forces keyword arguments.



In [0]:
cvModel = crossval.fit(training)

In [0]:
rf = RF_reg(labelCol='label_double', featuresCol='features',numTrees=30, maxDepth=10, maxBins=100)
fit = rf.fit(trainData)
# transformed = fit.transform(testData)

In [0]:
transformed = fit.transform(testData)

In [0]:
testData.show(2)

+---+--------+----------+-----------------+----------------+--------------------+--------------------------------+-------------------+-----------------+----------------------+----------------+--------+-------------+--------------+----------------+--------------------+---------+---------------+----------------------+---------------+--------------------+---------------------+------------------+--------------------+
|_c0|order_id|product_id|user_total_orders|user_total_items|total_distinct_items|user_average_days_between_orders|user_average_basket|order_hour_of_day|days_since_prior_order|days_since_ratio|aisle_id|department_id|product_orders|product_reorders|product_reorder_rate|UP_orders|UP_orders_ratio|UP_average_pos_in_cart|UP_reorder_rate|UP_orders_since_last|UP_delta_hour_vs_last|              pred|            features|
+---+--------+----------+-----------------+----------------+--------------------+--------------------------------+-------------------+-----------------+--------------

In [0]:
result = transformed.select(['order_id', 'product_id', 'pred'])

In [0]:
from io import StringIO
from azure.storage.blob import BlobServiceClient

csv_buffer = StringIO()
result.to_csv(csv_buffer, index=False)

storage_account_name = "udaystorage"
container_name = "rawdata"
blob_name = "df.csv"

connection_string = "DefaultEndpointsProtocol=https;AccountName=udaystorage;AccountKey=AbC123XyZ890RandomKeyHere9876543210==;EndpointSuffix=core.windows.net"

blob_service_client = BlobServiceClient.from_connection_string(connection_string)
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
blob_client.upload_blob(csv_buffer.getvalue(), overwrite=True)


No module named boto3
Traceback (most recent call last):
ImportError: No module named boto3



In [0]:
result.write.format("csv").option("header", "true").save("abfss://rawdata@udaystorage.dfs.core.windows.net/trends/file.csv")
