In [1]:
sc

In [2]:
from pyspark.sql.types import StringType, StructField, StructType, TimestampType, ArrayType, IntegerType, LongType

event_schema = StructType([
    StructField("timestamp", StringType()),
    StructField("visitorid", IntegerType()),
    StructField("event", StringType()),
    StructField("itemid", IntegerType()),
    StructField("transactionid", IntegerType())
])

In [3]:
fp = "events.csv" 
#The csv and the ipython notebook are present in same location. Thus only specifying the file name is enough
df = spark.read.csv(fp, header = True, schema = event_schema)

In [4]:
df.show()

+-------------+---------+---------+------+-------------+
|    timestamp|visitorid|    event|itemid|transactionid|
+-------------+---------+---------+------+-------------+
|1433221332117|   257597|     view|355908|         null|
|1433224214164|   992329|     view|248676|         null|
|1433221999827|   111016|     view|318965|         null|
|1433221955914|   483717|     view|253185|         null|
|1433221337106|   951259|     view|367447|         null|
|1433224086234|   972639|     view| 22556|         null|
|1433221923240|   810725|     view|443030|         null|
|1433223291897|   794181|     view|439202|         null|
|1433220899221|   824915|     view|428805|         null|
|1433221204592|   339335|     view| 82389|         null|
|1433222162373|   176446|     view| 10572|         null|
|1433221701252|   929206|     view|410676|         null|
|1433224229496|    15795|     view| 44872|         null|
|1433223697356|   598426|     view|156489|         null|
|1433224078165|   223343|     v

In [5]:
df.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- visitorid: integer (nullable = true)
 |-- event: string (nullable = true)
 |-- itemid: integer (nullable = true)
 |-- transactionid: integer (nullable = true)



In [6]:
from pyspark.sql import functions as f
from pyspark.sql import types as t

In [7]:
format = "yyyy-MM-dd HH:mm:ss"
df = df.withColumn('timestamp2', f.from_unixtime(df.timestamp/1000).cast(TimestampType()))


In [8]:
df.show()

+-------------+---------+---------+------+-------------+-------------------+
|    timestamp|visitorid|    event|itemid|transactionid|         timestamp2|
+-------------+---------+---------+------+-------------+-------------------+
|1433221332117|   257597|     view|355908|         null|2015-06-02 10:32:12|
|1433224214164|   992329|     view|248676|         null|2015-06-02 11:20:14|
|1433221999827|   111016|     view|318965|         null|2015-06-02 10:43:19|
|1433221955914|   483717|     view|253185|         null|2015-06-02 10:42:35|
|1433221337106|   951259|     view|367447|         null|2015-06-02 10:32:17|
|1433224086234|   972639|     view| 22556|         null|2015-06-02 11:18:06|
|1433221923240|   810725|     view|443030|         null|2015-06-02 10:42:03|
|1433223291897|   794181|     view|439202|         null|2015-06-02 11:04:51|
|1433220899221|   824915|     view|428805|         null|2015-06-02 10:24:59|
|1433221204592|   339335|     view| 82389|         null|2015-06-02 10:30:04|

In [9]:
type(df)

pyspark.sql.dataframe.DataFrame

In [10]:
df = df.drop('timestamp')

In [11]:
df = df.withColumnRenamed('timestamp2', 'timestamp')
df.show()

+---------+---------+------+-------------+-------------------+
|visitorid|    event|itemid|transactionid|          timestamp|
+---------+---------+------+-------------+-------------------+
|   257597|     view|355908|         null|2015-06-02 10:32:12|
|   992329|     view|248676|         null|2015-06-02 11:20:14|
|   111016|     view|318965|         null|2015-06-02 10:43:19|
|   483717|     view|253185|         null|2015-06-02 10:42:35|
|   951259|     view|367447|         null|2015-06-02 10:32:17|
|   972639|     view| 22556|         null|2015-06-02 11:18:06|
|   810725|     view|443030|         null|2015-06-02 10:42:03|
|   794181|     view|439202|         null|2015-06-02 11:04:51|
|   824915|     view|428805|         null|2015-06-02 10:24:59|
|   339335|     view| 82389|         null|2015-06-02 10:30:04|
|   176446|     view| 10572|         null|2015-06-02 10:46:02|
|   929206|     view|410676|         null|2015-06-02 10:38:21|
|    15795|     view| 44872|         null|2015-06-02 11

In [12]:
df.show()

+---------+---------+------+-------------+-------------------+
|visitorid|    event|itemid|transactionid|          timestamp|
+---------+---------+------+-------------+-------------------+
|   257597|     view|355908|         null|2015-06-02 10:32:12|
|   992329|     view|248676|         null|2015-06-02 11:20:14|
|   111016|     view|318965|         null|2015-06-02 10:43:19|
|   483717|     view|253185|         null|2015-06-02 10:42:35|
|   951259|     view|367447|         null|2015-06-02 10:32:17|
|   972639|     view| 22556|         null|2015-06-02 11:18:06|
|   810725|     view|443030|         null|2015-06-02 10:42:03|
|   794181|     view|439202|         null|2015-06-02 11:04:51|
|   824915|     view|428805|         null|2015-06-02 10:24:59|
|   339335|     view| 82389|         null|2015-06-02 10:30:04|
|   176446|     view| 10572|         null|2015-06-02 10:46:02|
|   929206|     view|410676|         null|2015-06-02 10:38:21|
|    15795|     view| 44872|         null|2015-06-02 11

In [13]:
df.printSchema()

root
 |-- visitorid: integer (nullable = true)
 |-- event: string (nullable = true)
 |-- itemid: integer (nullable = true)
 |-- transactionid: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [14]:
dfs = df.sample(fraction= 0.005)

In [15]:
dfs.show()

+---------+-----+------+-------------+-------------------+
|visitorid|event|itemid|transactionid|          timestamp|
+---------+-----+------+-------------+-------------------+
|   223343| view|402625|         null|2015-06-02 11:17:58|
|    77112| view| 97879|         null|2015-06-02 11:09:47|
|   572783| view| 73593|         null|2015-06-02 09:50:15|
|  1360339| view|381653|         null|2015-06-02 09:32:07|
|   363915| view|260256|         null|2015-06-02 09:46:29|
|   518659| view|426298|         null|2015-06-01 21:58:18|
|   197504| view| 45732|         null|2015-06-02 00:02:59|
|   718038| view|345041|         null|2015-06-02 04:34:52|
|   310533| view| 93558|         null|2015-06-02 05:09:04|
|     3215| view|220165|         null|2015-06-02 07:45:54|
|   861030| view| 59866|         null|2015-06-02 01:36:52|
|   231771| view|284397|         null|2015-06-02 04:04:36|
|   162446| view|407866|         null|2015-06-02 08:01:38|
|   528124| view|320732|         null|2015-06-02 08:30:0

In [16]:
from pyspark.sql.functions import count
dfsgb = dfs.groupBy(["visitorid", "itemid"]).agg(count("timestamp").alias("interactions"))

In [17]:
dfsgb.show()

+---------+------+------------+
|visitorid|itemid|interactions|
+---------+------+------------+
|   622975| 96545|           1|
|   346361|266085|           1|
|   885601|189751|           1|
|  1299683|234366|           1|
|   366618|250776|           1|
|   158533|389135|           1|
|  1177303| 95977|           1|
|  1046108|374698|           1|
|  1161342|205139|           1|
|  1242816|388242|           1|
|   682031|435624|           1|
|   136928|393744|           1|
|   254418|295144|           1|
|   122415|231009|           1|
|   299594|147274|           1|
|  1302193|210472|           1|
|   150285|441588|           1|
|  1090667|256721|           1|
|   775034|143063|           1|
|   286189|229884|           1|
+---------+------+------------+
only showing top 20 rows



In [18]:
from pyspark.sql.functions import max, min
dfsgb.agg(max("interactions")).show()

+-----------------+
|max(interactions)|
+-----------------+
|                3|
+-----------------+



In [19]:
dfsui = dfs.groupby("visitorid").agg(count("timestamp").alias("total_user_interaction_count"))

In [20]:
dfsui.agg(max("total_user_interaction_count")).show()

+---------------------------------+
|max(total_user_interaction_count)|
+---------------------------------+
|                               40|
+---------------------------------+



In [21]:
from pyspark.sql.functions import col, countDistinct
dfs.agg(countDistinct("visitorid")).show()

+-------------------------+
|count(DISTINCT visitorid)|
+-------------------------+
|                    13238|
+-------------------------+



In [22]:
dfs.agg(countDistinct("itemid")).show()

+----------------------+
|count(DISTINCT itemid)|
+----------------------+
|                 11379|
+----------------------+



In [23]:
type(dfsgb)

pyspark.sql.dataframe.DataFrame

In [24]:
from pyspark.sql.functions import *

df1 = dfsgb.alias('df1')
df2 = dfsui.alias('df2')

In [25]:
dfr = df1.join(df2, df1.visitorid == df2.visitorid).select('df1.*', 'df2.total_user_interaction_count')

In [26]:
dfr = dfr.withColumn('rui', dfr.interactions/dfr.total_user_interaction_count)

In [27]:
dfr.agg(min("rui")).show()

+--------+
|min(rui)|
+--------+
|   0.025|
+--------+



In [28]:
from pyspark.sql.functions import mean
dfr.agg(mean("rui")).show()

+------------------+
|          avg(rui)|
+------------------+
|0.9540213317959066|
+------------------+



In [29]:
dfr.approxQuantile("rui", [0.1, 0.2, 0.3, 0.4, 0.5], 0.25)

[0.025, 0.025, 1.0, 1.0, 1.0]

In [30]:
dfs.groupBy("event").count().show()

+-----------+-----+
|      event|count|
+-----------+-----+
|transaction|  110|
|  addtocart|  348|
|       view|13465|
+-----------+-----+



In [31]:
def assign_ratings(e):
    
    rating = 0
    if e == "view":
        rating = 2
    else:
        rating = 5
    
    return rating

In [32]:
from pyspark.sql import Row
dfs.select("event").rdd.map(lambda x : assign_ratings(x[0])).map(lambda x: Row(x)).toDF(["ratings"]).show()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 44.0 failed 1 times, most recent failure: Lost task 0.0 in stage 44.0 (TID 2321, localhost, executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
	... 14 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
	... 14 more


In [33]:
from pyspark.sql.functions import col, when
dfs = dfs.withColumn("ratings", when(col("event")=='view', 2).otherwise(5))

In [34]:
dfs.rdd.countApproxDistinct()

Py4JJavaError: An error occurred while calling o253.countApproxDistinct.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 45.0 failed 1 times, most recent failure: Lost task 3.0 in stage 45.0 (TID 2325, localhost, executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
	... 17 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1124)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1117)
	at org.apache.spark.rdd.RDD$$anonfun$countApproxDistinct$1.apply$mcJ$sp(RDD.scala:1263)
	at org.apache.spark.rdd.RDD$$anonfun$countApproxDistinct$1.apply(RDD.scala:1258)
	at org.apache.spark.rdd.RDD$$anonfun$countApproxDistinct$1.apply(RDD.scala:1258)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.countApproxDistinct(RDD.scala:1258)
	at org.apache.spark.rdd.RDD$$anonfun$countApproxDistinct$2.apply$mcJ$sp(RDD.scala:1287)
	at org.apache.spark.rdd.RDD$$anonfun$countApproxDistinct$2.apply(RDD.scala:1284)
	at org.apache.spark.rdd.RDD$$anonfun$countApproxDistinct$2.apply(RDD.scala:1284)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.countApproxDistinct(RDD.scala:1284)
	at org.apache.spark.api.java.JavaRDDLike$class.countApproxDistinct(JavaRDDLike.scala:693)
	at org.apache.spark.api.java.AbstractJavaRDDLike.countApproxDistinct(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
	... 17 more
