In [1]:
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession,SQLContext,Row
from pyspark.streaming import StreamingContext
# from pyspark.streaming.flume import FlumeUtils
from pyspark.storagelevel import StorageLevel
# from pyspark import StorageLevel
# from pyspark.storagelevel import StorageLevel
#from pyspark.streaming.kafka import KafkaUtils
from kafka import KafkaProducer
import pyspark.sql.functions as F
from pyspark.sql.types import *

# machine learning APIs
from pyspark.ml.feature import VectorAssembler, Vector
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import CountVectorizer, CountVectorizerModel
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import RandomForestClassificationModel, RandomForestClassifier
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import NGram
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
spark = SparkSession.builder.master('spark://nasa:7077').appName('bikestreamingapp').config("spark.executor.memory", "1g").config('spark.jars', 'mysql-connector-j-8.1.0.jar').config("spark.cores.max", "2").getOrCreate()

24/03/11 05:33:51 WARN Utils: Your hostname, nasa resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/03/11 05:33:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark

# 1)  flume configuration file

In [3]:
spark.sparkContext.setLogLevel("ERROR")
spark.conf.set("spark.sql.adaptive.enabled",True)

In [67]:
!ls -l /home/hadoop/flume/conf/bikestreaming.properties

-rwxrwx--x 1 hadoop hadoop 768 mar 10 20:17 /home/hadoop/flume/conf/bikestreaming.properties


In [57]:
!cat /home/hadoop/flume/conf/bikestreaming.properties

#flume configuration

agent1.sources = source1
agent1.sinks = sink1
agent1.channels = mem1


#Define a source for agent1
agent1.sources.source1.type=spooldir
agent1.sources.source1.spoolDir=/home/hadoop/flume/sent
agent1.sources.source1.fileHeader=true
agent1.sources.source1.fileSuffix=.SENT


#Define sink for agent1 local file system
agent1.sinks.sink1.type= file_roll
agent1.sinks.sink1.sink.directory= /home/hadoop/edureka/pyspark/certificationproject/streamdata
agent1.sinks.sink1.sink.pathManager.prefix = bikestreamdata_

#Define a file channel called fileChannel on agent1
agent1.channels.mem1.type= memory
agent1.channels.mem1.capacity=1000
agent1.channels.mem1.transactionCapacity=100

#integrations
agent1.sources.source1.channels = mem1
agent1.sinks.sink1.channel = mem1




# flume setting streaming 

# send data to spark streaming app 

In [8]:
!ls -l dataset/bikestreamdata.csv

-rw-rw-r-- 1 hadoop hadoop 617 mar  8 00:59 dataset/bikestreamdata.csv


In [39]:
!cp dataset/bikestreamdata.csv ~/flume/sent

In [55]:
!ls -l ~/flume/sent

total 4
-rw-rw-r-- 1 hadoop hadoop 617 mar  8 00:59 bikestreamdata.csv.SENT


# sink stream data reception

In [56]:
!ls -l streamdata/

total 636
-rw-rw-r-- 1 hadoop hadoop 648256 mar 11 06:36 bikestreamdata_1712343464545-1


# 2) spark streaming development and prediction

In [29]:
myschema = StructType().add('datetime','timestamp').add('season','integer').add('holiday','integer').add('workingday','integer').add('weather','integer').add('temp','double').add('atemp','double').add('humidity','integer').add('windspeed','double').add('casual','integer').add('registered','integer').add('count','integer')

In [30]:
# set streaming directory in hdfs
streamdir='/home/hadoop/edureka/pyspark/certificationproject/streamdata'
data = spark.readStream.csv \
(streamdir,schema=myschema,sep=',')

In [53]:
def process(dsf, epoch_id):
    
    # enconding
    categorical_cols=['season','holiday','workingday','weather']
    onehotencoder = [OneHotEncoder(inputCol=col, outputCol=f'{col}_ecd') for col in categorical_cols]
    
    # build pipeline
    stage = onehotencoder
    pipeline = Pipeline().setStages(stage)
    datamodel =pipeline.fit(dsf).transform(dsf)

    # season aggregation data
    datamodel = datamodel.withColumn('season_1',F.when(F.col('season')==1,1).otherwise(0))
    datamodel = datamodel.withColumn('season_2',F.when(F.col('season')==2,2).otherwise(0))
    datamodel = datamodel.withColumn('season_3',F.when(F.col('season')==3,3).otherwise(0))
    datamodel = datamodel.withColumn('season_4',F.when(F.col('season')==4,4).otherwise(0))
    
    # weather aggregation data
    datamodel = datamodel.withColumn('weather_1',F.when(F.col('weather')==1,1).otherwise(0))
    datamodel = datamodel.withColumn('weather_2',F.when(F.col('weather')==2,2).otherwise(0))
    datamodel = datamodel.withColumn('weather_3',F.when(F.col('weather')==3,3).otherwise(0))
    datamodel = datamodel.withColumn('weather_4',F.when(F.col('weather')==4,4).otherwise(0))
    
    dfdate = datamodel.withColumn('year', F.year(F.col('datetime'))) \
        .withColumn('month', F.month(F.col('datetime'))) \
        .withColumn('day', F.dayofmonth(F.col('datetime'))) \
        .withColumn('hour', F.hour(F.col('datetime')))
     
    # print(dfdate.columns)
        
    # load model
    lrmodel = PipelineModel.load('lrmodel')
    predictiondata = lrmodel.transform(dfdate)
    # predictiondata.printSchema()
    predictiondata.select('year','month','day', 'features','prediction').show(truncate=False)
    
    # data to write in database
    dbdata = predictiondata.select('year','month','day','prediction').show(truncate=False)
    
    # write prediction into RDBMS
    print('writing data ni Mysql databases..')
    predictiondata.select('year','month','day','prediction').write \
   .mode('overwrite') \
   .format("jdbc") \
  .option("driver","com.mysql.cj.jdbc.Driver") \
  .option("url", "jdbc:mysql://localhost:3306/sparkdb") \
  .option("dbtable", "bikestreamprediction") \
  .option("user", "hadoop") \
  .option("password", "hadoop") \
  .save()
    print('Data writed successfully.')
    

In [54]:
query = data.writeStream \
    .foreachBatch(process) \
    .outputMode("update") \
    .start()

+----+-----+---+---------------------------------------------------------------------------------+------------------+
|year|month|day|features                                                                         |prediction        |
+----+-----+---+---------------------------------------------------------------------------------+------------------+
|2011|1    |1  |(18,[2,3,5,8,9,11,14,15,16],[9.84,16.0,1.0,1.0,1.0,1.0,2011.0,1.0,1.0])          |16.254671829779   |
|2011|1    |1  |(18,[2,3,5,8,9,11,14,15,16,17],[9.02,40.0,1.0,1.0,1.0,1.0,2011.0,1.0,1.0,1.0])   |40.13419650353228 |
|2011|1    |1  |(18,[2,3,5,8,9,11,14,15,16,17],[9.02,32.0,1.0,1.0,1.0,1.0,2011.0,1.0,1.0,2.0])   |32.2078404506982  |
|2011|1    |1  |(18,[2,3,5,8,9,11,14,15,16,17],[9.84,13.0,1.0,1.0,1.0,1.0,2011.0,1.0,1.0,3.0])   |13.361494657726439|
|2011|1    |1  |(18,[2,3,5,8,9,11,14,15,16,17],[9.84,1.0,1.0,1.0,1.0,1.0,2011.0,1.0,1.0,4.0])    |1.4568736550925792|
|2011|1    |1  |(18,[2,3,5,8,9,12,14,15,16,17],[9.84,1.0

                                                                                

Data writed successfully.
