this series of tutorials can be found at:

https://www.analyticsvidhya.com/blog/2016/09/comprehensive-introduction-to-apache-spark-rdds-dataframes-using-pyspark/

https://www.analyticsvidhya.com/blog/2016/10/using-pyspark-to-perform-transformations-and-actions-on-rdd/

https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/

to use jupyter notebook with pyspark: https://blog.sicara.com/get-started-pyspark-jupyter-guide-tutorial-ae2fe84f594f

In [1]:
import findspark
findspark.init()
import pyspark
import random

creating sparkcontext

In [2]:
from pyspark import SparkContext
sc = SparkContext()

In [3]:
sc.version

'2.2.0'

### To load a csv file

In [4]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
transactions = sqlContext.read.csv('../../../data/raw/transactions.csv',header=True)
user_logs = sqlContext.read.csv('../../../data/raw/user_logs.csv',header=True)
train = sqlContext.read.csv('../../../data/raw/train.csv',header=True)
test = sqlContext.read.csv('../../../data/raw/sample_submission_zero.csv',header=True)
members = sqlContext.read.csv('../../../data/raw/members.csv',header=True)

first 10 rows

In [5]:
train.take(10)

[Row(msno='waLDQMmcOu2jLDaV1ddDkgCrB/jl6sD66Xzs0Vqax1Y=', is_churn='1'),
 Row(msno='QA7uiXy8vIbUSPOkCf9RwQ3FsT8jVq2OxDr8zqa7bRQ=', is_churn='1'),
 Row(msno='fGwBva6hikQmTJzrbz/2Ezjm5Cth5jZUNvXigKK2AFA=', is_churn='1'),
 Row(msno='mT5V8rEpa+8wuqi6x0DoVd3H5icMKkE9Prt49UlmK+4=', is_churn='1'),
 Row(msno='XaPhtGLk/5UvvOYHcONTwsnH97P4eGECeq+BARGItRw=', is_churn='1'),
 Row(msno='GBy8qSz16X5iYWD+3CMxv/Hm6OPSrXBYtmbnlRtknW0=', is_churn='1'),
 Row(msno='lYLh7TdkWpIoQs3i3o6mIjLH8/IEgMWP9r7OpsLX0Vo=', is_churn='1'),
 Row(msno='T0FF6lumjKcqEO0O+tUH2ytc+Kb9EkeaLzcVUiTr1aE=', is_churn='1'),
 Row(msno='Nb1ZGEmagQeba5E+nQj8VlQoWl+8SFmLZu+Y8ytIamw=', is_churn='1'),
 Row(msno='MkuWz0Nq6/Oq5fKqRddWL7oh2SLUSRe3/g+XmAWqW1Q=', is_churn='1')]

In [6]:
train.count()

992931

problem is: the users in transactions and user logs are not unique.

That is, the same user may have multiple entries in these dataframes

number of unique users in transaction

In [17]:
transactions.select('msno').distinct().count()

2363626

total number of entries in transaction

In [18]:
transactions.count()

21547746

number of unique users in user_logs

In [19]:
user_logs.select('msno').distinct().count()

5234111

total number of entries in user_logs

In [20]:
user_logs.count()

392106543

rename the msno column in train to avoid duplicated colnames after join

In [7]:
train = train.withColumnRenamed('msno', 'msno_1')

In [8]:
train1 = train.join(members, train['msno_1'] == members['msno'], 'left')

lazy evaluation, only calculates the dataframe when called

In [9]:
train1.count()

992931

In [10]:
train1 = train1.drop('msno')

to make sure we did the join correctly:

In [11]:
train1.orderBy('msno_1').select('msno_1').distinct().head(10)

[Row(msno_1='+++hVY1rZox/33YtvDgmKA2Frg/2qhkz12B9ylCvh8o='),
 Row(msno_1='+++l/EXNMLTijfLBa8p2TUVVVp2aFGSuUI/h7mLmthw='),
 Row(msno_1='+++snpr7pmobhLKUgSHTv/mpkqgBT0tQJ0zQj6qKrqc='),
 Row(msno_1='++/9R3sX37CjxbY/AaGvbwr3QkwElKBCtSvVzhCBDOk='),
 Row(msno_1='++/UDNo9DLrxT8QVGiDi1OnWfczAdEwThaVyD0fXO50='),
 Row(msno_1='++/ZHqwUNa7U21Qz+zqteiXlZapxey86l6eEorrak/g='),
 Row(msno_1='++0/NopttBsaAn6qHZA2AWWrDg7Me7UOMs1vsyo4tSI='),
 Row(msno_1='++0BJXY8tpirgIhJR14LDM1pnaRosjD1mdO1mIKxlJA='),
 Row(msno_1='++0EzISdtKY48Z0GY62jer/LFQwrNIAbADdtU5xStGY='),
 Row(msno_1='++0wqjjQge1mBBe5r4ciHGKwtF/m322zkra7CK8I+Mw=')]

In [12]:
train.orderBy('msno_1').head(10)

[Row(msno_1='+++hVY1rZox/33YtvDgmKA2Frg/2qhkz12B9ylCvh8o=', is_churn='0'),
 Row(msno_1='+++l/EXNMLTijfLBa8p2TUVVVp2aFGSuUI/h7mLmthw=', is_churn='0'),
 Row(msno_1='+++snpr7pmobhLKUgSHTv/mpkqgBT0tQJ0zQj6qKrqc=', is_churn='0'),
 Row(msno_1='++/9R3sX37CjxbY/AaGvbwr3QkwElKBCtSvVzhCBDOk=', is_churn='0'),
 Row(msno_1='++/UDNo9DLrxT8QVGiDi1OnWfczAdEwThaVyD0fXO50=', is_churn='0'),
 Row(msno_1='++/ZHqwUNa7U21Qz+zqteiXlZapxey86l6eEorrak/g=', is_churn='0'),
 Row(msno_1='++0/NopttBsaAn6qHZA2AWWrDg7Me7UOMs1vsyo4tSI=', is_churn='0'),
 Row(msno_1='++0BJXY8tpirgIhJR14LDM1pnaRosjD1mdO1mIKxlJA=', is_churn='0'),
 Row(msno_1='++0EzISdtKY48Z0GY62jer/LFQwrNIAbADdtU5xStGY=', is_churn='0'),
 Row(msno_1='++0wqjjQge1mBBe5r4ciHGKwtF/m322zkra7CK8I+Mw=', is_churn='0')]

Issue: when we have duplicates in the two dataframes we're trying to join, we end up with more rows in the resulting dataframe than both of its "parents".

To illustrate, if we have df1:

    Id | field_C | field_D
     A |   black | 11
     A |   white | 19
     A |  yellow | 20

and df2:

    Id | field_A | field_B 
     A |   cat   |  12     
     A |   dog   | 128     
     A |   dog   |  35     
 

when joining the two by Id, we will end up with

    Id | field_A | field_B | field_C | field_D
     A |   cat   |  12     |   black | 11
     A |   dog   | 128     |   black | 11
     A |   dog   |  35     |   black | 11
     A |   cat   |  12     |   white | 19
     A |   dog   |  128    |   white | 19
     A |   dog   |  35     |   white | 19
     A |   cat   |  12     |  yellow | 20
     A |   dog   |  128    |  yellow | 20
     A |   dog   |  35     |  yellow | 20
     
In general, you'll get $k * m$ rows with that the duplicated column value, where $k$ is the number of rows with that value in dataset 1 and $m$ is the number of rows with that value in dataset 2.

If we drop all duplicated msnos, we can avoid this issue but will suffer unaffordable information loss. If we don't drop any values, we will end up with a dataframe of amost 5 billion rows... I tried.

The duplicated msnos in **transactions** mostly originate from auto renewed records. The auto renewal in each month for the same user will be recorded as a separate entry, and the only differnce between these entries are the dates. We can probably aggregate this information into a new column like "number of autorenewals"? will talk with dear captain when get the chance.

The duplicated msnos in **user_logs** come from actual playing records for the same user on each day. These information are actually very valuable and shouldn't be neglected. We can potentially use a mean/median to capture characteristics for each user though.

Anyhow, we now have an effective way to manipulate even very large dataframes with pyspark. we can perform standard pandas operations such as filter, select, join, aggregate as well as map and reduce to them. the next step is to discuss how we want our data processed.

In [80]:
transactions.orderBy('msno').head(100)

[Row(msno='+++FOrTS7ab3tIgIh8eWwX4FqRv8w/FoiOuyXsFvphY=', payment_method_id='35', payment_plan_days='7', plan_list_price='0', actual_amount_paid='0', is_auto_renew='0', transaction_date='20160909', membership_expire_date='20160914', is_cancel='0'),
 Row(msno='+++IZseRRiQS9aaSkH6cMYU6bGDcxUieAi/tH67sC5s=', payment_method_id='38', payment_plan_days='410', plan_list_price='1788', actual_amount_paid='1788', is_auto_renew='0', transaction_date='20151121', membership_expire_date='20170104', is_cancel='0'),
 Row(msno='+++hVY1rZox/33YtvDgmKA2Frg/2qhkz12B9ylCvh8o=', payment_method_id='41', payment_plan_days='30', plan_list_price='99', actual_amount_paid='99', is_auto_renew='1', transaction_date='20170215', membership_expire_date='20170315', is_cancel='0'),
 Row(msno='+++hVY1rZox/33YtvDgmKA2Frg/2qhkz12B9ylCvh8o=', payment_method_id='41', payment_plan_days='30', plan_list_price='99', actual_amount_paid='99', is_auto_renew='1', transaction_date='20161215', membership_expire_date='20170115', is_can

In [79]:
user_logs.orderBy('msno').head(100)

[Row(msno='+++4vcS9aMH7KWdfh5git6nA5fC5jjisd5H/NcM++WM=', date='20150427', num_25='1', num_50='1', num_75='0', num_985='0', num_100='0', num_unq='2', total_secs='97.4110'),
 Row(msno='+++EI4HgyhgcJHIPXk/VRP7bt17+2joG39T6oEfJ+tc=', date='20160420', num_25='2', num_50='0', num_75='0', num_985='0', num_100='0', num_unq='1', total_secs='56.8680'),
 Row(msno='+++FOrTS7ab3tIgIh8eWwX4FqRv8w/FoiOuyXsFvphY=', date='20160915', num_25='0', num_50='0', num_75='0', num_985='1', num_100='3', num_unq='4', total_secs='834.1890'),
 Row(msno='+++FOrTS7ab3tIgIh8eWwX4FqRv8w/FoiOuyXsFvphY=', date='20160913', num_25='3', num_50='0', num_75='0', num_985='2', num_100='75', num_unq='61', total_secs='19967.7380'),
 Row(msno='+++FOrTS7ab3tIgIh8eWwX4FqRv8w/FoiOuyXsFvphY=', date='20160914', num_25='2', num_50='0', num_75='0', num_985='0', num_100='16', num_unq='18', total_secs='4253.9640'),
 Row(msno='+++FOrTS7ab3tIgIh8eWwX4FqRv8w/FoiOuyXsFvphY=', date='20160912', num_25='0', num_50='0', num_75='0', num_985='0', n

to just get rid of entries with duplicated msno when joining (don't want)

In [83]:
train1 = train1.drop_duplicates(subset=['msno_1'])
transactions_drop = transactions.drop_duplicates(subset=['msno'])
train2 = train1.join(transactions_drop, train1['msno_1'] == transactions_drop['msno'], 'left')
train2 = train2.drop('msno')
train2 = train2.drop_duplicates(subset=['msno_1'])
user_logs_drop = user_logs.drop_duplicates(subset=['msno'])
train3 = train2.join(user_logs_drop, train2['msno_1'] == user_logs_drop['msno'], 'inner')

In [84]:
train3.count()

869926

In [17]:
transactions.dtypes

[('msno', 'string'),
 ('payment_method_id', 'string'),
 ('payment_plan_days', 'string'),
 ('plan_list_price', 'string'),
 ('actual_amount_paid', 'string'),
 ('is_auto_renew', 'string'),
 ('transaction_date', 'string'),
 ('membership_expire_date', 'string'),
 ('is_cancel', 'string')]

In [18]:
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType


In [22]:
func =  udf (lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())

transactions = transactions.withColumn('transaction_date', func(col('transaction_date')))
transactions = transactions.withColumn('membership_expire_date', func(col('transaction_date')))

In [24]:
transactions.dtypes

[('msno', 'string'),
 ('payment_method_id', 'string'),
 ('payment_plan_days', 'string'),
 ('plan_list_price', 'string'),
 ('actual_amount_paid', 'string'),
 ('is_auto_renew', 'string'),
 ('transaction_date', 'date'),
 ('membership_expire_date', 'date'),
 ('is_cancel', 'string')]

In [26]:
transagg = transactions.groupBy("msno").agg({"payment_method_id": "avg", "payment_plan_days": "avg", "plan_list_price": "avg", "actual_amount_paid": "avg", "is_auto_renew": 'min', "transaction_date": "max", "membership_expire_date": "max", "is_cancel" : "max"})

In [27]:
transagg.count()

2363626

In [29]:
transagg.head(100)

Py4JJavaError: An error occurred while calling o251.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 26.0 failed 1 times, most recent failure: Lost task 3.0 in stage 26.0 (TID 856, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 220, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 138, in dump_stream
    for obj in iterator:
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 209, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in <lambda>
    return lambda *a: toInternal(f(*a))
  File "<ipython-input-22-8f10bb01a713>", line 1, in <lambda>
  File "/usr/lib/python3.5/_strptime.py", line 510, in _strptime_datetime
    tt, fraction = _strptime(data_string, format)
  File "/usr/lib/python3.5/_strptime.py", line 343, in _strptime
    (data_string, format))
ValueError: time data '20160111' does not match format '%m/%d/%Y'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2803)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2800)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2800)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2800)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 220, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 138, in dump_stream
    for obj in iterator:
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 209, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in <lambda>
    return lambda *a: toInternal(f(*a))
  File "<ipython-input-22-8f10bb01a713>", line 1, in <lambda>
  File "/usr/lib/python3.5/_strptime.py", line 510, in _strptime_datetime
    tt, fraction = _strptime(data_string, format)
  File "/usr/lib/python3.5/_strptime.py", line 343, in _strptime
    (data_string, format))
ValueError: time data '20160111' does not match format '%m/%d/%Y'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


# Ignore materials below $$$$$$$$$$$$$$$$$$$$$$$$$$$$$ 

### machine learning example

https://datahack.analyticsvidhya.com/contest/black-friday/

In [6]:
train = sqlContext.read.csv('../../../data/blackfriday/train.csv',header=True)
test = sqlContext.read.csv('../../../data/blackfriday/test.csv',header =True)

In [7]:
train.printSchema()

root
 |-- User_ID: string (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Product_Category_1: string (nullable = true)
 |-- Product_Category_2: string (nullable = true)
 |-- Product_Category_3: string (nullable = true)
 |-- Purchase: string (nullable = true)



In [8]:
train.head(10)

[Row(User_ID='1000001', Product_ID='P00069042', Gender='F', Age='0-17', Occupation='10', City_Category='A', Stay_In_Current_City_Years='2', Marital_Status='0', Product_Category_1='3', Product_Category_2=None, Product_Category_3=None, Purchase='8370'),
 Row(User_ID='1000001', Product_ID='P00248942', Gender='F', Age='0-17', Occupation='10', City_Category='A', Stay_In_Current_City_Years='2', Marital_Status='0', Product_Category_1='1', Product_Category_2='6', Product_Category_3='14', Purchase='15200'),
 Row(User_ID='1000001', Product_ID='P00087842', Gender='F', Age='0-17', Occupation='10', City_Category='A', Stay_In_Current_City_Years='2', Marital_Status='0', Product_Category_1='12', Product_Category_2=None, Product_Category_3=None, Purchase='1422'),
 Row(User_ID='1000001', Product_ID='P00085442', Gender='F', Age='0-17', Occupation='10', City_Category='A', Stay_In_Current_City_Years='2', Marital_Status='0', Product_Category_1='12', Product_Category_2='14', Product_Category_3=None, Purchase

number of rows

In [9]:
train.count()

550068

impute missing values

In [10]:
train.na.drop().count(),test.na.drop('any').count()

(166821, 71037)

In [11]:
train = train.fillna(-1)
test = test.fillna(-1)

summary

In [12]:
train.describe().show()

+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|summary|           User_ID|Product_ID|Gender|   Age|       Occupation|City_Category|Stay_In_Current_City_Years|     Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|         Purchase|
+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|  count|            550068|    550068|550068|550068|           550068|       550068|                    550068|             550068|            550068|            376430|            166821|           550068|
|   mean|1003028.8424013031|      null|  null|  null|8.076706879876669|         null|         1.468494139793958|0.40965298835780306| 5.404270017525106| 9.84232925112238

In [13]:
train.columns

['User_ID',
 'Product_ID',
 'Gender',
 'Age',
 'Occupation',
 'City_Category',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3',
 'Purchase']

In [14]:
train.select('User_ID').show()

+-------+
|User_ID|
+-------+
|1000001|
|1000001|
|1000001|
|1000001|
|1000002|
|1000003|
|1000004|
|1000004|
|1000004|
|1000005|
|1000005|
|1000005|
|1000005|
|1000005|
|1000006|
|1000006|
|1000006|
|1000006|
|1000007|
|1000008|
+-------+
only showing top 20 rows



distribution of categorical features

In [15]:
train.select('Product_ID').distinct().count(), test.select('Product_ID').distinct().count()


(3631, 3491)

number of categories for Product_ID, which are in test but not in train by applying subtract method

In [16]:
diff_cat_in_train_test=test.select('Product_ID').subtract(train.select('Product_ID'))
diff_cat_in_train_test.distinct().count()# For distict count

46

Transforming categorical variables to labels

In [40]:
from pyspark.ml.feature import StringIndexer
plan_indexer = StringIndexer(inputCol = 'Product_ID', outputCol = 'product_ID_new')
labeller = plan_indexer.fit(train)

Above, we build a ‘labeller’ by applying fit() method on train Dataframe. Later we will use this ‘labeller’ to transform our train and test. Let us transform our train and test Dataframe with the help of labeller. We need to call transform method for doing that. We will store the transformation result in Train1 and Test1.



In [41]:
Train1 = labeller.transform(train)
Test1 = labeller.transform(test)

In [42]:
Train1.show()

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+--------------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|product_ID_new|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+--------------+
|1000001| P00069042|     F| 0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|         766.0|
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|         183.0|
|1000001| P00087842|     F| 0-17|        10|            A|                         2|     

successfully added one transformed column product_ID_new in our previous train Dataframe

### feature selection

In [43]:
df = sqlContext.createDataFrame([
    (1.0, 1.0, "a"),
    (0.0, 2.0, "b"),
    (0.0, 0.0, "a")
], ["y", "x", "s"])

In [51]:
rf = RFormula(formula="y ~ x + s")

In [52]:
rf.fit(df).transform(df).show()

+---+---+---+---------+-----+
|  y|  x|  s| features|label|
+---+---+---+---------+-----+
|1.0|1.0|  a|[1.0,1.0]|  1.0|
|0.0|2.0|  b|[2.0,0.0]|  0.0|
|0.0|0.0|  a|[0.0,1.0]|  0.0|
+---+---+---+---------+-----+



In [79]:
from pyspark.ml.feature import RFormula
formula = RFormula(formula="Purchase ~ Age + Occupation +City_Category + Stay_In_Current_City_Years + Product_Category_1 + Product_Category_2 + Gender",featuresCol="features",labelCol="label")

In [80]:
t1 = formula.fit(Train1)
train1 = t1.transform(Train1)
test1 = t1.transform(Test1)

In [81]:
train1.dtypes

[('User_ID', 'string'),
 ('Product_ID', 'string'),
 ('Gender', 'string'),
 ('Age', 'string'),
 ('Occupation', 'string'),
 ('City_Category', 'string'),
 ('Stay_In_Current_City_Years', 'string'),
 ('Marital_Status', 'string'),
 ('Product_Category_1', 'string'),
 ('Product_Category_2', 'string'),
 ('Product_Category_3', 'string'),
 ('Purchase', 'string'),
 ('product_ID_new', 'double'),
 ('features', 'vector'),
 ('label', 'double')]

something wrong with the version or whatever from here.. but dataframe transformation can be done with no problem

In [89]:
train1.select('label').show()
train1.select('features').show()

+-------+
|  label|
+-------+
|11529.0|
| 2312.0|
| 5917.0|
| 9163.0|
|  245.0|
| 2246.0|
| 5247.0|
| 3251.0|
| 3012.0|
|  774.0|
|  614.0|
| 5830.0|
| 1525.0|
| 2777.0|
|  830.0|
| 6344.0|
|11581.0|
| 1422.0|
| 3371.0|
| 4244.0|
+-------+
only showing top 20 rows



Py4JJavaError: An error occurred while calling o1365.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 211.0 failed 1 times, most recent failure: Lost task 0.0 in stage 211.0 (TID 1295, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$5: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: StringIndexer encountered NULL value. To handle or skip NULLS, try setting StringIndexer.handleInvalid.
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:213)
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:208)
	... 16 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
	at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$5: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: StringIndexer encountered NULL value. To handle or skip NULLS, try setting StringIndexer.handleInvalid.
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:213)
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:208)
	... 16 more


### model: random forest

In [74]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor()

In [75]:
(train_cv, test_cv) = train1.randomSplit([0.7, 0.3])

In [76]:
model1 = rf.fit(train_cv)
predictions = model1.transform(test_cv)

Py4JJavaError: An error occurred while calling o891.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 183.0 failed 1 times, most recent failure: Lost task 0.0 in stage 183.0 (TID 1216, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$5: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at scala.collection.AbstractIterator.to(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: StringIndexer encountered NULL value. To handle or skip NULLS, try setting StringIndexer.handleInvalid.
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:213)
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:208)
	... 30 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
	at org.apache.spark.ml.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:112)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:105)
	at org.apache.spark.ml.regression.RandomForestRegressor.train(RandomForestRegressor.scala:130)
	at org.apache.spark.ml.regression.RandomForestRegressor.train(RandomForestRegressor.scala:45)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$5: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at scala.collection.AbstractIterator.to(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: StringIndexer encountered NULL value. To handle or skip NULLS, try setting StringIndexer.handleInvalid.
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:213)
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:208)
	... 30 more


In [101]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator()
mse = evaluator.evaluate(predictions,{evaluator.metricName:"mse" })
import numpy as np
np.sqrt(mse), mse

NameError: name 'predictions' is not defined

In [102]:
model = rf.fit(train1)
predictions1 = model.transform(test1)

Py4JJavaError: An error occurred while calling o1379.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 163.0 failed 1 times, most recent failure: Lost task 0.0 in stage 163.0 (TID 1656, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$5: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at scala.collection.AbstractIterator.to(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: StringIndexer encountered NULL value. To handle or skip NULLS, try setting StringIndexer.handleInvalid.
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:213)
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:208)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
	at org.apache.spark.ml.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:112)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:105)
	at org.apache.spark.ml.regression.RandomForestRegressor.train(RandomForestRegressor.scala:130)
	at org.apache.spark.ml.regression.RandomForestRegressor.train(RandomForestRegressor.scala:45)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$5: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at scala.collection.AbstractIterator.to(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: StringIndexer encountered NULL value. To handle or skip NULLS, try setting StringIndexer.handleInvalid.
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:213)
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:208)
	... 29 more


In [103]:
df = predictions1.selectExpr("User_ID as User_ID", "Product_ID as Product_ID", 'prediction as Purchase')

NameError: name 'predictions1' is not defined

In [104]:
df.toPandas().to_csv('submission.csv')

KeyboardInterrupt: 