In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql import Row
from numpy import prod
import datetime
import pandas

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.executor.memory", "1g") \
    .getOrCreate()

sc = spark.sparkContext

from pyspark.streaming import StreamingContext

streamContext = StreamingContext(sc,1)

def consumption(mpg):
    if float(mpg)>22:
        return('high_consumption')
    else:
        return('low_consumption')

consumption_udf = udf(consumption, StringType())


lines = streamContext.socketTextStream("localhost", 9000)

def my_processing(rdd):
    my_lines = rdd.map(lambda l: l.split(","))\
    .map(lambda p: Row(time=p[0],name=p[1], mpg=p[2], cyl=p[3], \
                       disp=p[4], hp=p[5], drat=p[6], wt=p[7], qsec=p[8], vs=p[9],\
                      am=p[10], gear=p[11], carb=p[12]))
    mtcars = my_lines.toDF(["am","carb","cyl","disp","drat","gear","hp","mpg","name","qsec","time","vs","wt"])
    mtcars = mtcars.withColumn('mpg_per_hp', mtcars.mpg * mtcars.hp )
    mtcars = mtcars.withColumn('mpg_', consumption_udf(mtcars.mpg))
    print(mtcars.show())
    print('############################################################################')
    
lines.foreachRDD(my_processing)

streamContext.start()

+---+----+---+-----+----+----+---+----+----------+----+--------+---+----+------------------+---------------+
| am|carb|cyl| disp|drat|gear| hp| mpg|      name|qsec|    time| vs|  wt|        mpg_per_hp|           mpg_|
+---+----+---+-----+----+----+---+----+----------+----+--------+---+----+------------------+---------------+
|  0|   3|  8|275.8|3.07|   3|180|16.4|Merc 450SE|17.4|11:55:32|  0|4.07|2951.9999999999995|low_consumption|
+---+----+---+-----+----+----+---+----+----------+----+--------+---+----+------------------+---------------+

None
############################################################################
+---+----+---+-----+----+----+---+---+-------------+----+--------+---+----+----------+----------------+
| am|carb|cyl| disp|drat|gear| hp|mpg|         name|qsec|    time| vs|  wt|mpg_per_hp|            mpg_|
+---+----+---+-----+----+----+---+---+-------------+----+--------+---+----+----------+----------------+
|  1|   2|  4|120.3|4.43|   5| 91| 26|Porsche 914-2|16.7|11: