# Spark Stream

# I/ Video games

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
spark = SparkSession.builder.appName('StructuredNetworkWordCount').getOrCreate()

# Define schema of the csv
userSchema = StructType().add('Rank', 'integer').add('Name', 'string').add('Platform', 'string').add('Year', 'double').add('Genre', 'string').add('Publisher', 'string').add('NA_Sales', 'double').add('EU_Sales', 'double').add('JP_Sales', 'double').add('Other_Sales', 'double').add('Global_Sales', 'double')

# Read CSV files from set path
dfCSV = spark.readStream.option('sep', ',').option('header', 'true').schema(userSchema).csv('/tmp/spark_stream/vgames')

# Create a temporary view. And this is a streaming DataFrame.
dfCSV.createOrReplaceTempView('VideoGames')

#display global_sales in 2006 where publisher is nintendo
global_2006_nintendo = spark.sql("SELECT Global_Sales FROM VideoGames where Year=2006 and  Publisher like 'Nintendo'")

#display total Global Sales for each year 
Total_Global_Sales = spark.sql("SELECT Year, SUM(Global_Sales) FROM VideoGames GROUP BY Year")

#Display the average of global sales for each pair of plateform and video game name
Avg_Global_sales=spark.sql("SELECT Platform, Name ,MEAN(Global_Sales) FROM VideoGames GROUP BY Platform, Name")

# Receiving data and Start running the query that prints the result on the console.
query = global_2006_nintendo.writeStream.outputMode('Append').format('console').start()
query1 = Total_Global_Sales.writeStream.outputMode('Complete').format('console').start()
query2 = Avg_Global_sales.writeStream.outputMode('Complete').format('console').start()
spark.streams.awaitAnyTermination()

# II/ Cars-2

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
spark = SparkSession.builder.appName('StructuredNetworkWordCount').getOrCreate()

# Define schema of the csv
userSchema = StructType().add('Manufacturer', 'string').add('Model', 'string').add('Sales_in_thousands', 'double').add('__year_resale_value', 'double').add('Vehicle_type', 'string').add('Price_in_thousands', 'double').add('Engine_size', 'double').add('Horsepower', 'double').add('Wheelbase', 'double').add('Width', 'double').add('Length', 'double').add('Curb_weight', 'double').add('Fuel_capacity', 'double').add('Fuel_efficiency', 'double').add('Latest_Launch', 'string').add('Power_perf_factor', 'double')


# Read CSV files from set path
dfCSV = spark.readStream.option('sep', ',').option('header', 'true').schema(userSchema).csv('/tmp/spark_stream/cars-2')

# Create a temporary view. And this is a streaming DataFrame.
dfCSV.createOrReplaceTempView('CarsTable')

#Print Top Fuel Efficient cars
top_fuel_eff_cars = spark.sql("SELECT Model, AVG(Fuel_efficiency) FROM  CarsTable  group by Model ORDER BY AVG(Fuel_efficiency) DESC")

#Print Top top manufacturer that have the best Fuel_efficiency average
top_fuel_eff_manu = spark.sql("SELECT Manufacturer, AVG(Fuel_efficiency) FROM  CarsTable  group by Manufacturer ORDER BY AVG(Fuel_efficiency) DESC")

#Print Total Sales in Thousands grouped by Manufacturer and order by TotalSales
Total_Sales = spark.sql("SELECT Manufacturer, SUM(Sales_in_thousands) as TotalSales FROM CarsTable  GROUP BY Manufacturer order by TotalSales DESC")

#Audi cars ordered by Year
Audi_cars=spark.sql("SELECT Manufacturer, Model ,SUBSTRING(Latest_Launch, length(Latest_Launch)-3 ,4 ) AS Year  FROM CarsTable where Manufacturer like 'Audi' ")

# Receiving data and Start running the query that prints the result on the console.
query = top_fuel_eff_cars.writeStream.outputMode('complete').format('console').start()
query1 = top_fuel_eff_manu.writeStream.outputMode('complete').format('console').start()
query2 = Total_Sales.writeStream.outputMode('complete').format('console').start()
query3 = Audi_cars.writeStream.outputMode('append').format('console').start()
spark.streams.awaitAnyTermination()

# III/ Google Play

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
spark = SparkSession.builder.appName('StructuredNetworkWordCount').getOrCreate()

# Define schema of the csv
userSchema = StructType().add('App', 'string').add('Category', 'string').add('Rating', 'float').add('Reviews', 'float').add('numberOfInstalls', 'float').add('Type', 'string').add('price', 'float').add('Genres', 'string')

# Read CSV files from set path
dfCSV = spark.readStream.option('sep', ',').option('header', 'true').schema(userSchema).csv('/tmp/spark_stream/googleplay')

# Create a temporary view. And this is a streaming DataFrame.
dfCSV.createOrReplaceTempView('Googleplay')

#Top installs
top_installs = spark.sql("SELECT App,AVG(numberOfInstalls) FROM Googleplay group by App order by AVG(numberOfInstalls) DESC ")

#top Rating apps where genre = medical
top_rate_medapp = spark.sql("SELECT App , AVG(Rating) from Googleplay where Genres like 'Medical' group by App order by AVG(Rating) DESC")

#top priced apps
top_priced_app = spark.sql("SELECT App ,AVG(price) AS price from Googleplay group by App order by price DESC")

#Top rated categories on Google play
top_rated_cat=spark.sql("SELECT  Genres,AVG(Rating) as average_rating from Googleplay Group BY Genres order by average_rating DESC")

#top installs where genre = Arcade
top_inst_arc=spark.sql("SELECT  App, SUM(numberOfInstalls) from Googleplay  where Genres like 'Arcade' Group BY App order by SUM(numberOfInstalls)  DESC limit 5")

# Receiving data and Start running the query that prints the result on the console.
query = top_installs.writeStream.outputMode('complete').format('console').start()
query1 = top_rate_medapp.writeStream.outputMode('complete').format('console').start()
query2 = top_priced_app.writeStream.outputMode('complete').format('console').start()
query3 = top_rated_cat.writeStream.outputMode('complete').format('console').start()
query4 = top_inst_arc.writeStream.outputMode('complete').format('console').start()
spark.streams.awaitAnyTermination()