In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('jlin787_I4').getOrCreate()

In [2]:
df = spark.read.load('./SeoulBikeData5.csv',format = "csv", header="true")
w = spark.read.load('./sbd.csv',format = "csv", header="true")

In [3]:
df.show()
w.show()

+---------+---------------+----+-----------+--------+---------+-----------+-------------------+---------------+--------+---------+-------+----------+--------------+------------+
|     Date|RentedBikeCount|Hour|Temperature|Humidity|WindSpeed|Visibility |DewPointTemperature|SolarRadiation |Rainfall|Snowfall |Seasons|   Holiday|FunctioningDay|DayoftheWeek|
+---------+---------------+----+-----------+--------+---------+-----------+-------------------+---------------+--------+---------+-------+----------+--------------+------------+
|1/12/2017|            254|   0|       -5.2|      37|      2.2|       2000|              -17.6|              0|       0|        0| Winter|No Holiday|           Yes|      Friday|
|1/12/2017|            204|   1|       -5.5|      38|      0.8|       2000|              -17.6|              0|       0|        0| Winter|No Holiday|           Yes|      Friday|
|1/12/2017|            173|   2|         -6|      39|        1|       2000|              -17.7|              0

In [4]:
w1 = df.join(w,on='Date')
w1.show(5)

+---------+---------------+----+-----------+--------+---------+-----------+-------------------+---------------+--------+---------+-------+----------+--------------+------------+-----+
|     Date|RentedBikeCount|Hour|Temperature|Humidity|WindSpeed|Visibility |DewPointTemperature|SolarRadiation |Rainfall|Snowfall |Seasons|   Holiday|FunctioningDay|DayoftheWeek|Month|
+---------+---------------+----+-----------+--------+---------+-----------+-------------------+---------------+--------+---------+-------+----------+--------------+------------+-----+
|1/12/2017|            254|   0|       -5.2|      37|      2.2|       2000|              -17.6|              0|       0|        0| Winter|No Holiday|           Yes|      Friday|   12|
|1/12/2017|            254|   0|       -5.2|      37|      2.2|       2000|              -17.6|              0|       0|        0| Winter|No Holiday|           Yes|      Friday|   12|
|1/12/2017|            254|   0|       -5.2|      37|      2.2|       2000|     

In [5]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier,RandomForestClassifier
from pyspark.ml import Pipeline

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors 
from pyspark.ml.feature import VectorAssembler

from pyspark.ml.feature import StringIndexer

In [6]:
from pyspark.sql.types import (StructField,StringType,IntegerType,StructType,BooleanType,DoubleType,FloatType)

In [7]:
data_schema = [StructField('Date',StringType(),True),
               StructField('RentedBikeCount',IntegerType(),True),
               StructField('Hour',IntegerType(),True),
               StructField('Temperature',FloatType(),True),
               StructField('Humidity',IntegerType(),True),
               StructField('WindSpeed',FloatType(),True),
               StructField('Visibility ',IntegerType(),True),
               StructField('DewPointTemperature',FloatType(),True),
               StructField('SolarRadiation ',FloatType(),True),
               StructField('Rainfall',FloatType(),True),
              StructField('Snowfall ',FloatType(),True),
              StructField('Seasons',StringType(),True),
              StructField(' Holiday',StringType(),True),
              StructField('FunctioningDay',StringType(),True),
              StructField('DayoftheWeek',StringType(),True),
              StructField('Month',DoubleType(),True)]


final_struct = StructType(fields=data_schema)

In [8]:
df2 = spark.read.csv('SeoulBikeData5.csv', schema=final_struct, header= True)
df2.printSchema()

root
 |-- Date: string (nullable = true)
 |-- RentedBikeCount: integer (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Temperature: float (nullable = true)
 |-- Humidity: integer (nullable = true)
 |-- WindSpeed: float (nullable = true)
 |-- Visibility : integer (nullable = true)
 |-- DewPointTemperature: float (nullable = true)
 |-- SolarRadiation : float (nullable = true)
 |-- Rainfall: float (nullable = true)
 |-- Snowfall : float (nullable = true)
 |-- Seasons: string (nullable = true)
 |--  Holiday: string (nullable = true)
 |-- FunctioningDay: string (nullable = true)
 |-- DayoftheWeek: string (nullable = true)
 |-- Month: double (nullable = true)



In [9]:
df2.count()

8760

In [10]:
from pyspark.sql.functions import col,regexp_replace

In [11]:
df3=df2.withColumn("Dayoftheweek",regexp_replace(df2["Dayoftheweek"],"Wednesday ",'Wednesday'))
df4=df3.withColumn("Dayoftheweek",regexp_replace(df3["Dayoftheweek"],"Tuesday ",'Tuesday'))
df5=df4.withColumn("Dayoftheweek",regexp_replace(df4["Dayoftheweek"],"Saturday ",'Saturday'))

In [12]:
from pyspark.sql import functions as F

Seasons = df5.select("Seasons").distinct().rdd.flatMap(lambda x: x).collect()
FunctioningDay = df5.select("FunctioningDay").distinct().rdd.flatMap(lambda x: x).collect()
Holiday = df5.select(" Holiday").distinct().rdd.flatMap(lambda x: x).collect()
Dayoftheweek = df5.select("Dayoftheweek").distinct().rdd.flatMap(lambda x: x).collect()

Seasons_expr = [F.when(F.col("Seasons") == s, 1).otherwise(0).alias("Season_" + s) for s in Seasons]
FunctioningDay_expr = [F.when(F.col("FunctioningDay") == f, 1).otherwise(0).alias("FunctioningDay_" + f) for f in FunctioningDay]
Holiday_expr = [F.when(F.col(" Holiday") == h, 1).otherwise(0).alias("Holiday_" + h) for h in Holiday]
Dayoftheweek_expr = [F.when(F.col("Dayoftheweek") == d, 1).otherwise(0).alias("Dayoftheweek_" + d) for d in Dayoftheweek]

df6= df5.select( 'RentedBikeCount','Date','Hour','Temperature','Humidity','WindSpeed','Visibility ',
                           'DewPointTemperature','SolarRadiation ','Rainfall','Snowfall ','Month','Seasons','FunctioningDay',' Holiday','Dayoftheweek', *Seasons_expr+FunctioningDay_expr+Holiday_expr+Dayoftheweek_expr)
df6.show(5)

+---------------+---------+----+-----------+--------+---------+-----------+-------------------+---------------+--------+---------+-----+-------+--------------+----------+------------+-------------+-------------+-------------+-------------+-----------------+------------------+------------------+---------------+----------------------+--------------------+-------------------+---------------------+---------------------+-------------------+-------------------+
|RentedBikeCount|     Date|Hour|Temperature|Humidity|WindSpeed|Visibility |DewPointTemperature|SolarRadiation |Rainfall|Snowfall |Month|Seasons|FunctioningDay|   Holiday|Dayoftheweek|Season_Spring|Season_Summer|Season_Autumn|Season_Winter|FunctioningDay_No|FunctioningDay_Yes|Holiday_No Holiday|Holiday_Holiday|Dayoftheweek_Wednesday|Dayoftheweek_Tuesday|Dayoftheweek_Friday|Dayoftheweek_Thursday|Dayoftheweek_Saturday|Dayoftheweek_Monday|Dayoftheweek_Sunday|
+---------------+---------+----+-----------+--------+---------+-----------+-----

In [13]:
df7= df6.drop('Seasons','FunctioningDay',' Holiday','Dayoftheweek')
df7.show(5)
#df7.show()

+---------------+---------+----+-----------+--------+---------+-----------+-------------------+---------------+--------+---------+-----+-------------+-------------+-------------+-------------+-----------------+------------------+------------------+---------------+----------------------+--------------------+-------------------+---------------------+---------------------+-------------------+-------------------+
|RentedBikeCount|     Date|Hour|Temperature|Humidity|WindSpeed|Visibility |DewPointTemperature|SolarRadiation |Rainfall|Snowfall |Month|Season_Spring|Season_Summer|Season_Autumn|Season_Winter|FunctioningDay_No|FunctioningDay_Yes|Holiday_No Holiday|Holiday_Holiday|Dayoftheweek_Wednesday|Dayoftheweek_Tuesday|Dayoftheweek_Friday|Dayoftheweek_Thursday|Dayoftheweek_Saturday|Dayoftheweek_Monday|Dayoftheweek_Sunday|
+---------------+---------+----+-----------+--------+---------+-----------+-------------------+---------------+--------+---------+-----+-------------+-------------+----------

In [14]:
print(df7.count(),len(df7.columns))

8760 27


In [15]:
#rename the columns
from pyspark.sql.functions import col

mapping = dict(zip(['Visibility ', 'SolarRadiation ', 'Snowfall ','Month', 'Holiday_No Holiday', 'Holiday_Holiday', 
                    'Dayoftheweek_Wednesday', 'Dayoftheweek_Tuesday', 'Dayoftheweek_Friday',
                   'Dayoftheweek_Thursday', 'Dayoftheweek_Saturday', 'Dayoftheweek_Monday','Dayoftheweek_Sunday',
                    'Season_Spring', 'Season_Summer', 'Season_Autumn','Season_Winter'], 
                   ['Visibility', 'SolarRadiation', 'Snowfall','Months', 'NoHoliday', 'Holiday', 
                    'Wednesday', 'Tuesday', 'Friday',
                   'Thursday', 'Saturday', 'Monday','Sunday',
                    'Spring', 'Summer', 'Autumn','Winter']))

df8 = df7.select([col(c).alias(mapping.get(c, c)) for c in df7.columns])
df8.show(1)

print(df8.count(),len(df8.columns))

+---------------+---------+----+-----------+--------+---------+----------+-------------------+--------------+--------+--------+------+------+------+------+------+-----------------+------------------+---------+-------+---------+-------+------+--------+--------+------+------+
|RentedBikeCount|     Date|Hour|Temperature|Humidity|WindSpeed|Visibility|DewPointTemperature|SolarRadiation|Rainfall|Snowfall|Months|Spring|Summer|Autumn|Winter|FunctioningDay_No|FunctioningDay_Yes|NoHoliday|Holiday|Wednesday|Tuesday|Friday|Thursday|Saturday|Monday|Sunday|
+---------------+---------+----+-----------+--------+---------+----------+-------------------+--------------+--------+--------+------+------+------+------+------+-----------------+------------------+---------+-------+---------+-------+------+--------+--------+------+------+
|            254|1/12/2017|   0|       -5.2|      37|      2.2|      2000|              -17.6|           0.0|     0.0|     0.0|  null|     0|     0|     0|     1|             

In [16]:
#drop 
df9 = df8.drop('Months')
df9.show(1)

+---------------+---------+----+-----------+--------+---------+----------+-------------------+--------------+--------+--------+------+------+------+------+-----------------+------------------+---------+-------+---------+-------+------+--------+--------+------+------+
|RentedBikeCount|     Date|Hour|Temperature|Humidity|WindSpeed|Visibility|DewPointTemperature|SolarRadiation|Rainfall|Snowfall|Spring|Summer|Autumn|Winter|FunctioningDay_No|FunctioningDay_Yes|NoHoliday|Holiday|Wednesday|Tuesday|Friday|Thursday|Saturday|Monday|Sunday|
+---------------+---------+----+-----------+--------+---------+----------+-------------------+--------------+--------+--------+------+------+------+------+-----------------+------------------+---------+-------+---------+-------+------+--------+--------+------+------+
|            254|1/12/2017|   0|       -5.2|      37|      2.2|      2000|              -17.6|           0.0|     0.0|     0.0|     0|     0|     0|     1|                0|                 1|    

In [17]:
print(df9.count(),len(df9.columns))

8760 26


In [18]:
!pip3 install pyspark

Traceback (most recent call last):
  File "/home/ubuntu/.local/bin/pip3", line 7, in <module>
    from pip._internal.cli.main import main
  File "/home/ubuntu/.local/lib/python3.5/site-packages/pip/_internal/cli/main.py", line 58
    sys.stderr.write(f"ERROR: {exc}")
                                   ^
SyntaxError: invalid syntax


In [19]:
spark.version

'2.1.1'

In [20]:
import pyspark.sql. functions as f
! pip3 install --user  scikit-learn

Traceback (most recent call last):
  File "/home/ubuntu/.local/bin/pip3", line 7, in <module>
    from pip._internal.cli.main import main
  File "/home/ubuntu/.local/lib/python3.5/site-packages/pip/_internal/cli/main.py", line 58
    sys.stderr.write(f"ERROR: {exc}")
                                   ^
SyntaxError: invalid syntax


In [21]:
import pyspark
from pyspark.sql import SparkSession

In [22]:
from pyspark import since, SparkContext
from pyspark.ml.common import _java2py, _py2java
from pyspark.ml.wrapper import _jvm
import sys

from pyspark import since, SparkContext
from pyspark.ml.common import _java2py, _py2java
from pyspark.ml.wrapper import JavaWrapper, _jvm
from pyspark.sql.column import Column, _to_seq
from pyspark.sql.functions import lit


In [23]:
import sys
import array
import struct

In [24]:
import pyspark.sql.functions as F
from pyspark.ml.linalg import VectorUDT, DenseVector


In [25]:
import sys
if sys.version >= '3':
    basestring = str

from pyspark.rdd import RDD, ignore_unicode_prefix
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg import Matrix, _convert_to_vector
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat.test import ChiSqTestResult, KolmogorovSmirnovTestResult

In [26]:
__all__ = ['MultivariateStatisticalSummary', 'Statistics']


In [27]:
from __future__ import print_function
import numpy as np
from pyspark import SparkContext,SparkConf
from pyspark.mllib.stat import Statistics


In [None]:
!sudo apt-get install python-seaborn

In [None]:
!sudo apt-get install python-sklearn

In [42]:
py -m pip install --upgrade build

SyntaxError: invalid syntax (<ipython-input-42-68771a84ff12>, line 1)

In [37]:
import joblib
import pandas as pd
from sklearn.datasets import make_blobs
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score


ImportError: No module named 'joblib'

In [35]:
import pandas as pd
from sklearn.utils import resample

ImportError: No module named 'sklearn'

In [29]:
from pyspark import SparkContext, SparkConf
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.stat import Statistics

features = df9.rdd.map(lambda row: row[0:])


corr_mat=Statistics.corr(features, method="pearson")
corr_mat.show()

conf = SparkConf().setAppName("jlin787_I43").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)

rdd1 = sc.parallelize(corr_mat)
rdd2 = rdd1.map(lambda x: [float(i) for i in x])
df9= rdd2.toDF(['RentedBikeCount','Hour','Temperature','Humidity','WindSpeed',
                'Visibility','DewPointTemperature','SolarRadiation','Rainfall','Snowfall',
                'Spring','Summer','Autumn','Winter','FunctioningDay_No',
                'FunctioningDay_Yes','NoHoliday','Holiday','Wednesday',
                'Tuesday','Friday','Thursday','Saturday','Monday','Sunday'])
df9.select('RentedBikeCount').show(30)
print(len(df9.columns))\

Py4JJavaError: An error occurred while calling o271.corr.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage 28.0 (TID 824, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 74, in _convert_to_vector
    return DenseVector(l)
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 289, in __init__
    ar = np.array(ar, dtype=np.float64)
ValueError: could not convert string to float: '1/12/2017'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
	at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
	at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1368)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.first(RDD.scala:1367)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.numCols(RowMatrix.scala:61)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeCovariance(RowMatrix.scala:331)
	at org.apache.spark.mllib.stat.correlation.PearsonCorrelation$.computeCorrelationMatrix(PearsonCorrelation.scala:49)
	at org.apache.spark.mllib.stat.correlation.Correlations$.corrMatrix(Correlation.scala:66)
	at org.apache.spark.mllib.stat.Statistics$.corr(Statistics.scala:74)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.corr(PythonMLLibAPI.scala:842)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 74, in _convert_to_vector
    return DenseVector(l)
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 289, in __init__
    ar = np.array(ar, dtype=np.float64)
ValueError: could not convert string to float: '1/12/2017'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [30]:
def compute_correlation_matrix(df,method='pearson'):
    df_rdd = df.rdd.map(lambda row:row[0:])
    corr_mat = Statistics.corr(df_rdd, method=method)
    corr_mat_df = pd.DataFrame(corr_mat,
                                columns=d.columns,
                                index=d.columns)

df10=compute_correlation_matrix(df9)

Py4JJavaError: An error occurred while calling o327.corr.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 29.0 failed 1 times, most recent failure: Lost task 0.0 in stage 29.0 (TID 825, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 74, in _convert_to_vector
    return DenseVector(l)
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 289, in __init__
    ar = np.array(ar, dtype=np.float64)
ValueError: could not convert string to float: '1/12/2017'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
	at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
	at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1368)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.first(RDD.scala:1367)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.numCols(RowMatrix.scala:61)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeCovariance(RowMatrix.scala:331)
	at org.apache.spark.mllib.stat.correlation.PearsonCorrelation$.computeCorrelationMatrix(PearsonCorrelation.scala:49)
	at org.apache.spark.mllib.stat.correlation.Correlations$.corrMatrix(Correlation.scala:66)
	at org.apache.spark.mllib.stat.Statistics$.corr(Statistics.scala:74)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.corr(PythonMLLibAPI.scala:842)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 74, in _convert_to_vector
    return DenseVector(l)
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 289, in __init__
    ar = np.array(ar, dtype=np.float64)
ValueError: could not convert string to float: '1/12/2017'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [31]:
df9 = df9.RentedBikeCount.sort_values(ascending=False)
df9

TypeError: 'Column' object is not callable

In [32]:
import pandas as pd
from sklearn.utils import resample

ImportError: No module named 'sklearn'

In [33]:
from __future__ import print_function

import numpy as np

from pyspark import SparkContext
# $example on$
from pyspark.mllib.stat import Statistics
# $example off$

if __name__ == "__main__":
    sc = SparkContext(appName="jlin787_I42")  # SparkContext

    # $example on$
    seriesX = sc.parallelize(["RentedBikeCount"])  # a series
    # seriesY must have the same number of partitions and cardinality as seriesX
    seriesY = sc.parallelize(["Rainfall"])

    # Compute the correlation using Pearson's method. Enter "spearman" for Spearman's method.
    # If a method is not specified, Pearson's method will be used by default.
    print("Correlation is: " + str(Statistics.corr(seriesX, seriesY, method="pearson")))

    df9 = sc.parallelize(
        [np.array([1.0, 10.0, 100.0]), np.array([2.0, 20.0, 200.0]), np.array([5.0, 33.0, 366.0])]
    )  # an RDD of Vectors

    # calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method.
    # If a method is not specified, Pearson's method will be used by default.
    print(Statistics.corr(data, method="pearson"))
    # $example off$

    sc.stop()


ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=jlin787_I4, master=local[*]) created by getOrCreate at <ipython-input-1-1091d14704d2>:5 

In [1]:

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
import matplotlib.pyplot as plt
import seaborn as sns

ImportError: No module named 'pyspark'

In [2]:
from pyspark.ml.linalg import Vectors
from pyspark.mllib.stat.test import ChiSqTestResult, KolmogorovSmirnovTestResult
from pyspark.mllib.stat.test import Correlation

ImportError: No module named 'pyspark'

In [3]:
from pyspark.mllib.stat import Statistics
import pandas as pd

ImportError: No module named 'pyspark'

In [4]:
!pip install seaborn

Traceback (most recent call last):
  File "/home/ubuntu/.local/bin/pip", line 7, in <module>
    from pip._internal.cli.main import main
  File "/home/ubuntu/.local/lib/python3.5/site-packages/pip/_internal/cli/main.py", line 58
    sys.stderr.write(f"ERROR: {exc}")
                                   ^
SyntaxError: invalid syntax


In [5]:
from pip import __main__
if __name__ == '__main__':
    sys.exit(__main__._main())

NameError: name 'sys' is not defined

In [6]:
from pyspark.mllib.stat._statistics import *

ImportError: No module named 'pyspark'

In [7]:
from pyspark.ml.linalg import DenseMatrix, Vectors
from pyspark.mllib.stat._statistics import Correlation

ImportError: No module named 'pyspark'

In [8]:
from pyspark import SparkContext, SparkConf
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.stat import Statistics

features = df9.rdd.map(lambda row: row[0:])

sc = SparkContext.getOrCreate(conf)
corr_mat=Statistics.corr(features, method="pearson")
corr_mat.show()

conf = SparkConf().setAppName("jlin787_I43").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)

rdd1 = sc.parallelize(corr_mat)
rdd2 = rdd1.map(lambda x: [float(i) for i in x])
df9= rdd2.toDF(['RentedBikeCount','Hour','Temperature','Humidity','WindSpeed',
                'Visibility','DewPointTemperature','SolarRadiation','Rainfall','Snowfall',
                'Spring','Summer','Autumn','Winter','FunctioningDay_No',
                'FunctioningDay_Yes','NoHoliday','Holiday','Wednesday',
                'Tuesday','Friday','Thursday','Saturday','Monday','Sunday'])
df9.select('RentedBikeCount').show(30)
print(len(df9.columns))\

ImportError: No module named 'pyspark'

In [9]:
!pip3 install --upgrade pip

Traceback (most recent call last):
  File "/home/ubuntu/.local/bin/pip3", line 7, in <module>
    from pip._internal.cli.main import main
  File "/home/ubuntu/.local/lib/python3.5/site-packages/pip/_internal/cli/main.py", line 58
    sys.stderr.write(f"ERROR: {exc}")
                                   ^
SyntaxError: invalid syntax


In [10]:
!pip3 install --user  scikit-learn

Traceback (most recent call last):
  File "/home/ubuntu/.local/bin/pip3", line 7, in <module>
    from pip._internal.cli.main import main
  File "/home/ubuntu/.local/lib/python3.5/site-packages/pip/_internal/cli/main.py", line 58
    sys.stderr.write(f"ERROR: {exc}")
                                   ^
SyntaxError: invalid syntax


In [11]:
import pandas as pd
from sklearn.utils import resample

ImportError: No module named 'sklearn'

In [None]:
from pyspark import SparkContext
from pyspark.sql.session import SparkSession

In [None]:
df.columns

In [None]:
df.describe().show()

In [12]:
df.printSchema()

NameError: name 'df' is not defined

In [13]:
df.na.drop()

NameError: name 'df' is not defined

In [None]:
df.na.drop().show()

In [14]:
df.count()

NameError: name 'df' is not defined

In [None]:
df.na.drop().count()

In [15]:
df1 = df.na.drop()
df1.count()

NameError: name 'df' is not defined

In [16]:
#fill the variables of the null value 
df.na.fill("NO VALUE", subset = ['Rainfall']).show()

NameError: name 'df' is not defined

In [17]:
print(df.count(),len(df.columns))

NameError: name 'df' is not defined

In [None]:
import pyspark.sql.functions as f
!pip3 install --user scikit-learn

In [18]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier,RandomForestClassifier
from pyspark.ml import Pipeline

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors 
from pyspark.ml.feature import VectorAssembler

from pyspark.ml.feature import StringIndexer

ImportError: No module named 'pyspark'

In [19]:
from pyspark.sql.types import (StructField,StringType,IntegerType,StructType,BooleanType,DoubleType,FloatType)

ImportError: No module named 'pyspark'

In [20]:
data_schema = [StructField('Date',StringType(),True),
               StructField('RentedBikeCount',IntegerType(),True),
               StructField('Hour',IntegerType(),True),
               StructField('Temperature',FloatType(),True),
               StructField('Humidity',IntegerType(),True),
               StructField('WindSpeed',FloatType(),True),
               StructField('Visibility ',IntegerType(),True),
               StructField('DewPointTemperature',FloatType(),True),
               StructField('SolarRadiation ',FloatType(),True),
               StructField('Rainfall',FloatType(),True),
              StructField('Snowfall ',FloatType(),True),
              StructField('Seasons',StringType(),True),
              StructField(' Holiday',StringType(),True),
              StructField('FunctioningDay',StringType(),True),
              StructField('DayoftheWeek',StringType(),True)]


final_struct = StructType(fields=data_schema)

NameError: name 'StructField' is not defined

In [21]:
df2 = spark.read.csv('SeoulBikeData5.csv', schema=final_struct, header= True)
df2.printSchema()

NameError: name 'spark' is not defined

In [22]:
df2.select('Date', 'RentedBikeCount','Hour','Temperature','Humidity', 'WindSpeed', 'Visibility ', 'DewPointTemperature', 'SolarRadiation ', 'Rainfall','Snowfall ', 'Seasons', ' Holiday', 'FunctioningDay', 'DayoftheWeek').show()

NameError: name 'df2' is not defined

In [23]:
from pyspark.sql.functions import dayofmonth,month,hour,year,format_number

ImportError: No module named 'pyspark'

In [24]:
df2.describe('RentedBikeCount','Temperature','Humidity','WindSpeed','Visibility ','DewPointTemperature','SolarRadiation ','Rainfall','Snowfall ').show()

NameError: name 'df2' is not defined

In [25]:
df2.describe('Temperature','Humidity','WindSpeed','Visibility ','DewPointTemperature','SolarRadiation ','Rainfall','Snowfall ').show()

NameError: name 'df2' is not defined

In [None]:
import matplotlib.pyplot as plt
import pandas 

In [None]:
x = df2.toPandas()["DayoftheWeek"].values.tolist()

In [None]:
y = df2.toPandas()["RentedBikeCount"].values.tolist()

In [None]:
plt.bar(x,y)
plt.show()

In [None]:
df2.select('RentedBikeCount').toPandas().hist()

In [None]:
type(df2)

In [None]:
import numpy as np
def outliers_iqr(ys):
    quartile_1, quartile_3 = np.percentile(ys, [25, 75])
    iqr = quartile_3 - quartile_1
    lower_bound = quartile_1 - (iqr * 1.5)
    upper_bound = quartile_3 + (iqr * 1.5)
    ser = np.zeros(len(ys))
    pos =np.where((ys > upper_bound) | (ys < lower_bound))[0]
    ser[pos]=1
    return(ser)

In [None]:
#drop fbs
df3 = df2.drop("fbs")
df3.show()
df2.show()

In [None]:
from pyspark.sql import functions as f


class Outlier():

    def __init__(self, df):
        self.df = df


    def _calculate_bounds(self):
        bounds = {
            c: dict(
                zip(["q1", "q3"], self.df.approxQuantile(c, [0.25, 0.75], 0))
            )
            for c, d in zip(self.df.columns, self.df.dtypes) if d[1] in ["Rainfall", "Snowfall"]
        }

        for c in bounds:
            iqr = bounds[c]['q3'] - bounds[c]['q1']
            bounds[c]['min'] = bounds[c]['q1'] - (iqr * 1.5)
            bounds[c]['max'] = bounds[c]['q3'] + (iqr * 1.5)

        return bounds


    def _flag_outliers_df(self):
        bounds = self._calculate_bounds()

        outliers_col = [
            f.when(
                ~f.col(c).between(bounds[c]['min'], bounds[c]['max']),
                f.col(c)
            ).alias(c + '_outlier')
            for c in bounds]

        return self.df.select(*outliers_col)


    def show_outliers(self):

        outlier_df = self._flag_outliers_df()

        for outlier in outlier_df.columns:
            outlier_df.select(outlier).filter(f.col(outlier).isNotNull()).show()

In [None]:
Outlier(df).show_outliers()

In [None]:
import matplotlib.pyplot as plt
plt.boxplot('Rainfall',data = df2.toPandas())
df2.groupBy('Rainfall').count().show()

In [None]:
plt.boxplot('WindSpeed',data = df2.toPandas())
df2.groupBy('WindSpeed').count().show()

In [None]:
plt.boxplot('Temperature',data = df2.toPandas())
df2.groupBy('Temperature').count().show()

In [None]:
plt.boxplot('SolarRadiation ',data = df2.toPandas())
df2.groupBy('SolarRadiation ').count().show()

In [None]:
plt.boxplot('Humidity',data = df2.toPandas())
df2.groupBy('Humidity').count().show()

In [None]:
plt.boxplot('DewPointTemperature',data = df2.toPandas())
df2.groupBy('DewPointTemperature').count().show()

In [None]:
plt.boxplot('Visibility ',data = df2.toPandas())
df2.groupBy('Visibility ').count().show()

In [None]:
plt.boxplot('Snowfall ',data = df2.toPandas())
df2.groupBy('Snowfall ').count().show()

In [None]:
from pyspark.sql.functions import col
df.describe().filter(col("summary") == "count").show()

In [None]:
from pyspark.sql.functions import lit

row = df.count()
summary = df.describe().filter(col("summary")=="count")
summary.select(*((lit(row)-col(c)).alias(c) for c in df.columns)).show()

In [None]:
import numpy as np
def outliers_iqr(ys):
    quartile_1, quartile_3 = np.percentile(ys, [25, 75])
    iqr = quartile_3 - quartile_1
    lower_bound = quartile_1 - (iqr * 1.5)
    upper_bound = quartile_3 + (iqr * 1.5)
    ser = np.zeros(len(ys))
    pos =np.where((ys > upper_bound) | (ys < lower_bound))[0]
    ser[pos]=1
    return(ser)

In [None]:
bounds = {
    c: dict(
        zip(["q1", "q3"], df2.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in df2.columns
}

In [None]:
for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)
print(bounds)

In [None]:

df.select(
    "*",
    *[
        f.when(
            f.col(c).between(bounds[c]['lower'], bounds[c]['upper']),
            0
        ).otherwise(1).alias(c+"_out") 
        for c in df2.columns
    ]
).show()

In [None]:
from pyspark.sql import SparkSession

In [None]:
import pandas as pd
import numpy as np 
import matplotlib.pyplot as plt

In [None]:
import pyspark.sql.functions as F

In [None]:
IQRdf = df2.groupby('Rainfall').agg(F.expr('percentile(duration, array(0.25))')[0].alias('lower_quartile'), F.expr('percentile(duration, array(0.75))')[0].alias('upper_quartile'), F.expr('percentile(duration, array(0.5))')[0].alias('duration_median')).withColumn("quartile_deviation", (F.col("upper_quartile") - F.col("lower_quartile"))/2)

outliersremoved = explodesplitdf.join(IQRdf, "genre", "left").filter(F.abs(F.col("duration")-F.col("duration_median")) >= (F.col("quartile_deviation")*2.2))

In [None]:
bounds = {
    c: dict(
        zip(["q1", "q3"], df.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in df.columns
}


In [None]:
from pyspark.sql import functions as F

Seasons = df2.select("Seasons").distinct().rdd.flatMap(lambda x: x).collect()
FunctioningDay = df2.select("FunctioningDay").distinct().rdd.flatMap(lambda x: x).collect()
Holiday = df2.select(" Holiday").distinct().rdd.flatMap(lambda x: x).collect()
Dayoftheweek = df2.select(" Dayoftheweek").distinct().rdd.flatMap(lambda x: x).collect()

Seasons_expr = [F.when(F.col("Seasons") == s, 1).otherwise(0).alias("Season_" + s) for s in Seasons]
FunctioningDay_expr = [F.when(F.col("FunctioningDay") == f, 1).otherwise(0).alias("FunctioningDay_" + f) for f in FunctioningDay]
Holiday_expr = [F.when(F.col(" Holiday") == h, 1).otherwise(0).alias("Holiday_" + h) for h in Holiday]


df2_dum= df2.select( 'RentedBikeCount','Seasons','FunctioningDay',' Holiday', *Seasons_expr+FunctioningDay_expr+Holiday_expr)
df2_dum.show()