In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder \
.master("local[4]") \
.appName("Csv-Üzeri-SQL") \
.config("spark.executor.memory","4g") \
.config("spark.driver.memory","2g") \
.getOrCreate()

In [3]:
olympic = spark.read \
.option("sep", ",") \
.option("header", "True") \
.option("inferSchema", "True") \
.csv("/home/jovyan/data/all_medalists.csv")

In [4]:
google = spark.read \
.option("sep", ",") \
.option("header", "True") \
.option("inferSchema", "True") \
.csv("/home/jovyan/data/googleplaystore.csv")

##### !!

In [5]:
retail = spark.read \
.option("sep", ";") \
.option("header", "True") \
.option("inferSchema", "True") \
.csv("/home/jovyan/data/OnlineRetail.csv") \
.withColumn("Date", to_timestamp("InvoiceDate", "dd.MM.yyyy HH:mm"))

In [6]:
retail.show(5)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+-------------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|               Date|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+-------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|1.12.2010 08:26|     2,55|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|    71053| WHITE METAL LANTERN|       6|1.12.2010 08:26|     3,39|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84406B|CREAM CUPID HEART...|       8|1.12.2010 08:26|     2,75|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84029G|KNITTED UNION FLA...|       6|1.12.2010 08:26|     3,39|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|1.12.2010 08:26|     3,39|     17850|United Kingdom|2010-12-01 08:26:00|
+-------

In [7]:
olympic.createOrReplaceTempView("Olympic")

In [8]:
google.createOrReplaceTempView("Google")

In [9]:
retail.createOrReplaceTempView("Retail")

# Basic SQL - 1

In [10]:
spark.sql("""

with cte as
(
SELECT NOC, 
CASE WHEN Medal = "Gold" THEN 1 ELSE 0 END AS Gold,
CASE WHEN Medal = "Silver" THEN 1 ELSE 0 END AS Silver,
CASE WHEN Medal = "Bronze" THEN 1 ELSE 0 END AS Bronze,
1 AS Piece
FROM Olympic
WHERE Medal in ("Gold", "Silver", "Bronze")
)
select NOC, SUM(Gold) AS Gold, SUM(Silver) AS Silver,
SUM(Bronze) AS Bronze, SUM(Piece) AS TOTAL
FROM cte
GROUP BY NOC
ORDER BY TOTAL DESC
""").show(10)

+---+----+------+------+-----+
|NOC|Gold|Silver|Bronze|TOTAL|
+---+----+------+------+-----+
|USA|2088|  1195|  1052| 4335|
|URS| 838|   627|   584| 2049|
|GBR| 498|   591|   505| 1594|
|FRA| 378|   461|   475| 1314|
|ITA| 460|   394|   374| 1228|
|GER| 407|   350|   454| 1211|
|AUS| 293|   369|   413| 1075|
|HUN| 400|   308|   345| 1053|
|SWE| 347|   349|   325| 1021|
|GDR| 329|   271|   225|  825|
+---+----+------+------+-----+
only showing top 10 rows



# Spark SQL - 1 

In [12]:
olympic \
.withColumn("Gold", when(olympic.Medal=="Gold", 1).otherwise(0)) \
.withColumn("Silver", when(col("Medal")=="Silver", 1).otherwise(0)) \
.withColumn("Bronze", when(olympic.Medal == "Bronze", 1).otherwise(0)) \
.groupBy("NOC") \
.agg(sum("Gold").alias('Gold'), 
     sum("Silver").alias('Silver'), 
     sum("Bronze").alias('Bronze'),
     (sum("Gold")+sum("Silver")+sum("Bronze")).alias('TOTAL'))  \
.orderBy(desc("TOTAL")) \
.show(10)

+------+-------+--------+----------+--------------------+---+------+--------------------+------------+------+
|  City|Edition|   Sport|Discipline|             Athlete|NOC|Gender|               Event|Event_gender| Medal|
+------+-------+--------+----------+--------------------+---+------+--------------------+------------+------+
|Athens|   1896|Aquatics|  Swimming|       HAJOS, Alfred|HUN|   Men|      100m freestyle|           M|  Gold|
|Athens|   1896|Aquatics|  Swimming|    HERSCHMANN, Otto|AUT|   Men|      100m freestyle|           M|Silver|
|Athens|   1896|Aquatics|  Swimming|   DRIVAS, Dimitrios|GRE|   Men|100m freestyle fo...|           M|Bronze|
|Athens|   1896|Aquatics|  Swimming|  MALOKINIS, Ioannis|GRE|   Men|100m freestyle fo...|           M|  Gold|
|Athens|   1896|Aquatics|  Swimming|  CHASAPIS, Spiridon|GRE|   Men|100m freestyle fo...|           M|Silver|
|Athens|   1896|Aquatics|  Swimming|CHOROPHAS, Efstat...|GRE|   Men|     1200m freestyle|           M|Bronze|
|Athens|  

# Basic SQL - 2 

In [11]:
spark.sql("""


with cte as
(
SELECT Distinct Category, Size,
SUBSTRING(Size, 1, LENGTH(Size) - 1) as SonKaraktereKadar1,
LEFT(Size, LENGTH(Size)-1) as SonKaraktereKadar2,
COUNT(*) AS Adet
FROM Google
WHERE Size LIKE "%M" AND Category in ('GAME', "FAMILY", "TOOLS")
GROUP BY Category, Size
),
cte2 as
(
Select Category, 
CASE WHEN SonKaraktereKadar1 BETWEEN 50 AND 100 THEN "50-100 M" ELSE "0-50 M" END AS SizeCat,
Adet AS Total
from CTE
)
SELECT Category, SizeCat, SUM(Total) AS Total
FROM cte2
GROUP BY Category, SizeCat
ORDER BY Category, SizeCat

""").show(10)

+--------+--------+-----+
|Category| SizeCat|Total|
+--------+--------+-----+
|  FAMILY|  0-50 M| 1427|
|  FAMILY|50-100 M|  373|
|    GAME|  0-50 M|  615|
|    GAME|50-100 M|  397|
|   TOOLS|  0-50 M|  649|
|   TOOLS|50-100 M|   10|
+--------+--------+-----+



# Spark SQL - 2

In [16]:
google \
.filter(google.Size.like('%M')) \
.filter(google.Category.isin('GAME', "FAMILY", "TOOLS")) \
.withColumn("SizeNum", google.Size.substr(lit(0), length(google.Size)-1)) \
.withColumn("SizeCat", when(col("SizeNum").between(50,100), "50-100 M").otherwise("0-50 M")) \
.groupBy("Category", "SizeCat") \
.agg(count("Category").alias("Total")) \
.orderBy("Category", "SizeCat") \
.show(25)

+--------+--------+-----+
|Category| SizeCat|Total|
+--------+--------+-----+
|  FAMILY|  0-50 M| 1427|
|  FAMILY|50-100 M|  373|
|    GAME|  0-50 M|  615|
|    GAME|50-100 M|  397|
|   TOOLS|  0-50 M|  649|
|   TOOLS|50-100 M|   10|
+--------+--------+-----+



# Basic SQL - 3 

In [13]:
#Aşağıdaki Query'i anlamak için
spark.sql("""

with cte as
(
Select Category,
REPLACE(SUBSTRING(Installs,1, LENGTH(Installs)-1), ",","") as InstallCount, Count(*) as Num
from Google
WHERE Installs LIKE ('%+')
GROUP BY Category, Installs
), cte2 as
(
select Category, CAST(InstallCount AS int), Num  from cte
ORDER BY 2 desc, 3 desc 
)
Select *
from cte2

""").show(5)

+----------------+------------+---+
|        Category|InstallCount|Num|
+----------------+------------+---+
|   COMMUNICATION|  1000000000| 19|
|          SOCIAL|  1000000000|  8|
|            GAME|  1000000000|  6|
|TRAVEL_AND_LOCAL|  1000000000|  5|
|    PRODUCTIVITY|  1000000000|  4|
+----------------+------------+---+
only showing top 5 rows



In [14]:
# Her yüklenme kategorinde en fazla indirilen category'i listelemek.
# RANK kullanılınca mesela 0'dan 2 tane indirilen 3 kategori var üçü de geliyor eşit olduğu için 
# ROW_NUMBER'da alfabetik olarak ilki geliyor.
spark.sql("""

with cte as
(
Select Category,
REPLACE(SUBSTRING(Installs,1, LENGTH(Installs)-1), ",","") as InstallCount, Count(*) as Num
from Google
WHERE Installs LIKE ('%+')
GROUP BY Category, Installs
), cte2 as
(
select Category, CAST(InstallCount AS int), Num  from cte
ORDER BY 2 desc, 3 desc 
), cte3 as
(
Select *,
RANK() OVER (PARTITION BY InstallCount ORDER BY InstallCount DESC, Num DESC) as rank
from cte2
)
Select Category, InstallCount, Num as Total 
from cte3
where rank = 1
ORDER BY InstallCount DESC, Category

""").show(1000)

+-------------+------------+-----+
|     Category|InstallCount|Total|
+-------------+------------+-----+
|COMMUNICATION|  1000000000|   19|
|COMMUNICATION|   500000000|   17|
|         GAME|   500000000|   17|
|         GAME|   100000000|  134|
|         GAME|    50000000|   84|
|         GAME|    10000000|  225|
|       FAMILY|     5000000|  123|
|       FAMILY|     1000000|  275|
|       FAMILY|      500000|  109|
|       FAMILY|      100000|  256|
|       FAMILY|       50000|  101|
|       FAMILY|       10000|  231|
|       FAMILY|        5000|  108|
|       FAMILY|        1000|  215|
|       FAMILY|         500|   69|
|       FAMILY|         100|  116|
|       FAMILY|          50|   35|
|       FAMILY|          10|   61|
|     BUSINESS|           5|   18|
|       FAMILY|           1|   10|
|       FAMILY|           0|    2|
|      FINANCE|           0|    2|
|       SOCIAL|           0|    2|
+-------------+------------+-----+



# Spark SQL - 3

In [22]:
#RANK için aşağıdaki kütüphane import edilir.
#from pyspark.sql.window import Window

google \
.filter(google.Installs.like('%+')) \
.withColumn("Installs2", google.Installs.substr(lit(0), length(google.Installs)-1)) \
.withColumn("InstallCount", regexp_replace(col("Installs2"), ",", "").cast(IntegerType())) \
.groupBy("Category", "InstallCount") \
.agg(count("Category").alias("Total")) \
.withColumn("Rank", rank().over(Window.partitionBy("InstallCount").orderBy(desc("Total")))) \
.filter(col("Rank") == 1) \
.select("Category", "InstallCount", "Total") \
.orderBy(desc("InstallCount"), "Category") \
.show(25)

+-------------+------------+-----+
|     Category|InstallCount|Total|
+-------------+------------+-----+
|COMMUNICATION|  1000000000|   19|
|COMMUNICATION|   500000000|   17|
|         GAME|   500000000|   17|
|         GAME|   100000000|  134|
|         GAME|    50000000|   84|
|         GAME|    10000000|  225|
|       FAMILY|     5000000|  123|
|       FAMILY|     1000000|  275|
|       FAMILY|      500000|  109|
|       FAMILY|      100000|  256|
|       FAMILY|       50000|  101|
|       FAMILY|       10000|  231|
|       FAMILY|        5000|  108|
|       FAMILY|        1000|  215|
|       FAMILY|         500|   69|
|       FAMILY|         100|  116|
|       FAMILY|          50|   35|
|       FAMILY|          10|   61|
|     BUSINESS|           5|   18|
|       FAMILY|           1|   10|
|       FAMILY|           0|    2|
|      FINANCE|           0|    2|
|       SOCIAL|           0|    2|
+-------------+------------+-----+



# Basic SQL - 4 

In [16]:
spark.sql("""

with cte as
(
Select Edition, NOC, Count(*) as MedalCount,
RANK() over (partition by Edition ORDER BY Count(*) DESC) AS Rank
FROM Olympic
WHERE Edition BETWEEN 1992 AND 2008 
AND Medal='Gold' 
GROUP BY Edition, NOC
Having Count(*)>10
ORDER BY Edition, MedalCount desc
)
Select Edition, NOC, MedalCount 
from cte
WHERE RANK <=3

""").show(100)

+-------+---+----------+
|Edition|NOC|MedalCount|
+-------+---+----------+
|   1992|EUN|        92|
|   1992|USA|        89|
|   1992|GER|        81|
|   1996|USA|       160|
|   1996|GER|        42|
|   1996|CUB|        39|
|   2000|USA|       130|
|   2000|RUS|        66|
|   2000|AUS|        60|
|   2004|USA|       116|
|   2004|CHN|        52|
|   2004|AUS|        49|
|   2008|USA|       125|
|   2008|CHN|        74|
|   2008|RUS|        43|
+-------+---+----------+



# Spark SQL - 4

In [17]:
olympic \
.filter((olympic.Edition.between(1992, 2008)) & (olympic.Medal=="Gold")) \
.groupBy("Edition", "NOC") \
.agg(count("Edition").alias("MedalCount")) \
.withColumn("Rank", rank().over(Window.partitionBy("Edition").orderBy(desc("MedalCount")))) \
.filter(col("Rank").between(1,3)) \
.select("Edition", "NOC", "MedalCount") \
.orderBy("Edition") \
.show(75)

+-------+---+----------+
|Edition|NOC|MedalCount|
+-------+---+----------+
|   1992|USA|        89|
|   1992|GER|        81|
|   1992|EUN|        92|
|   1996|USA|       160|
|   1996|GER|        42|
|   1996|CUB|        39|
|   2000|USA|       130|
|   2000|RUS|        66|
|   2000|AUS|        60|
|   2004|CHN|        52|
|   2004|AUS|        49|
|   2004|USA|       116|
|   2008|USA|       125|
|   2008|CHN|        74|
|   2008|RUS|        43|
+-------+---+----------+



# Basic SQL - 5

In [24]:
spark.sql("""

with cte as 
(
select *,
cast(Quantity as int)*cast(replace(UnitPrice,',','.') as float) as ToplamTutar,
CASE WHEN LEFT(InvoiceNo,1) = 'C' Then 'İptal Edilmiş' Else 'Tahsil Edilmiş' end as Durum,
YEAR(Date) as Yil, MONTH(Date) as Ay
from Retail
)
Select Ay, Yil, Durum, SUM(ToplamTutar) as Toplam
from cte
GROUP BY Yil, Ay, Durum
ORDER BY Yil, Ay, Durum


""").show(30)

+---+----+--------------+-------------------+
| Ay| Yil|         Durum|             Toplam|
+---+----+--------------+-------------------+
| 12|2010|Tahsil Edilmiş|  823746.1386869252|
| 12|2010| İptal Edilmiş|  -74789.1201132536|
|  1|2011|Tahsil Edilmiş|  691364.5523450822|
|  1|2011| İptal Edilmiş|-131364.29426512122|
|  2|2011|Tahsil Edilmiş| 523631.88908462226|
|  2|2011| İptal Edilmiş|-25569.239793926477|
|  3|2011|Tahsil Edilmiş|  717639.3586430848|
|  3|2011| İptal Edilmiş| -34372.27955004573|
|  4|2011|Tahsil Edilmiş|  537808.6189456339|
|  4|2011| İptal Edilmiş| -44601.49981290102|
|  5|2011|Tahsil Edilmiş|  770536.0177987814|
|  5|2011| İptal Edilmiş| -47202.50994770229|
|  6|2011|Tahsil Edilmiş|   761739.898292534|
|  6|2011| İptal Edilmiş| -70616.78039768338|
|  7|2011|Tahsil Edilmiş|  719221.1889705638|
|  7|2011| İptal Edilmiş| -37921.08029431105|
|  8|2011|Tahsil Edilmiş|  737014.2585139647|
|  8|2011| İptal Edilmiş|-54333.749689251184|
|  9|2011|Tahsil Edilmiş| 1058590.

# Spark SQL - 5

#### replace kullanmaya çalıştım belki yarım saat uğraştım, onun yerine regexp_replace varmış.

In [48]:
#Like'lı ifade retail.InvoiceNo.contains('C') şeklinde de yazılabilirdi.
#float ve int dönüşümlerini kendi yapıyor.
retail \
.withColumn("Yil", year(col("Date"))) \
.withColumn("Ay", month(col("Date"))) \
.withColumn("Durum", when(retail.InvoiceNo.like('C%'), 'İptal Edilmiş').otherwise('Tahsil Edilmiş')) \
.withColumn("UnitPriceF", regexp_replace("UnitPrice", ",", ".")) \
.withColumn("Tutar", col("Quantity")*col("UnitPriceF")) \
.groupby("Yil", "Ay", "Durum") \
.agg(sum("Tutar").alias("Toplam")) \
.sort("Yil", "Ay", "Durum") \
.show(50)

+----+---+--------------+-------------------+
| Yil| Ay|         Durum|             Toplam|
+----+---+--------------+-------------------+
|2010| 12|Tahsil Edilmiş|  823746.1399999646|
|2010| 12| İptal Edilmiş| -74789.11999999965|
|2011|  1|Tahsil Edilmiş|  691364.5600000108|
|2011|  1| İptal Edilmiş|-131364.29999999967|
|2011|  2|Tahsil Edilmiş|  523631.8900000278|
|2011|  2| İptal Edilmiş|-25569.240000000005|
|2011|  3|Tahsil Edilmiş|  717639.3600000187|
|2011|  3| İptal Edilmiş| -34372.28000000007|
|2011|  4|Tahsil Edilmiş|  537808.6210000159|
|2011|  4| İptal Edilmiş|           -44601.5|
|2011|  5|Tahsil Edilmiş|  770536.0200000107|
|2011|  5| İptal Edilmiş| -47202.50999999987|
|2011|  6|Tahsil Edilmiş|  761739.9000000219|
|2011|  6| İptal Edilmiş|  -70616.7799999997|
|2011|  7|Tahsil Edilmiş|  719221.1910000272|
|2011|  7| İptal Edilmiş|-37921.079999999994|
|2011|  8|Tahsil Edilmiş|   737014.260000017|
|2011|  8| İptal Edilmiş| -54333.75000000007|
|2011|  9|Tahsil Edilmiş|  1058590