In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [3]:
df = spark.read.csv('Case.csv', header=True)
df.show()

+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| case_id|province|           city|group|      infection_case|confirmed| latitude| longitude|
+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| 1000001|   Seoul|     Yongsan-gu| TRUE|       Itaewon Clubs|      139|37.538621|126.992652|
| 1000002|   Seoul|      Gwanak-gu| TRUE|             Richway|      119| 37.48208|126.901384|
| 1000003|   Seoul|        Guro-gu| TRUE| Guro-gu Call Center|       95|37.508163|126.884387|
| 1000004|   Seoul|   Yangcheon-gu| TRUE|Yangcheon Table T...|       43|37.546061|126.874209|
| 1000005|   Seoul|      Dobong-gu| TRUE|     Day Care Center|       43|37.679422|127.044374|
| 1000006|   Seoul|        Guro-gu| TRUE|Manmin Central Ch...|       41|37.481059|126.894343|
| 1000007|   Seoul|from other city| TRUE|SMR Newly Planted...|       36|        -|         -|
| 1000008|   Seoul|  Dongdaemun-gu| TRUE|       Dongan Churc

In [4]:
df.createOrReplaceTempView('cases')
spark.sql('select * from cases where limit 5').show()

+--------+--------+------------+-----+--------------------+---------+---------+----------+
| case_id|province|        city|group|      infection_case|confirmed| latitude| longitude|
+--------+--------+------------+-----+--------------------+---------+---------+----------+
| 1000001|   Seoul|  Yongsan-gu| TRUE|       Itaewon Clubs|      139|37.538621|126.992652|
| 1000002|   Seoul|   Gwanak-gu| TRUE|             Richway|      119| 37.48208|126.901384|
| 1000003|   Seoul|     Guro-gu| TRUE| Guro-gu Call Center|       95|37.508163|126.884387|
| 1000004|   Seoul|Yangcheon-gu| TRUE|Yangcheon Table T...|       43|37.546061|126.874209|
| 1000005|   Seoul|   Dobong-gu| TRUE|     Day Care Center|       43|37.679422|127.044374|
+--------+--------+------------+-----+--------------------+---------+---------+----------+



In [5]:
# filtering the data based on confirmed cases are greater than 100

query = '''select * from cases where confirmed > 100'''
confirmed_gt = spark.sql(query)
confirmed_gt.show()

+--------+-----------------+---------------+-----+--------------------+---------+---------+----------+
| case_id|         province|           city|group|      infection_case|confirmed| latitude| longitude|
+--------+-----------------+---------------+-----+--------------------+---------+---------+----------+
| 1000001|            Seoul|     Yongsan-gu| TRUE|       Itaewon Clubs|      139|37.538621|126.992652|
| 1000002|            Seoul|      Gwanak-gu| TRUE|             Richway|      119| 37.48208|126.901384|
| 1000036|            Seoul|              -|FALSE|     overseas inflow|      298|        -|         -|
| 1000037|            Seoul|              -|FALSE|contact with patient|      162|        -|         -|
| 1200001|            Daegu|         Nam-gu| TRUE|  Shincheonji Church|     4511| 35.84008|  128.5667|
| 1200002|            Daegu|   Dalseong-gun| TRUE|Second Mi-Ju Hosp...|      196|35.857375|128.466651|
| 1200003|            Daegu|         Seo-gu| TRUE|Hansarang Convale...|  

In [6]:
# get the count of df before filtering adn after filtering 
query = '''select count(*) from cases'''
actual_count =  spark.sql(query)
actual_count.show()

+--------+
|count(1)|
+--------+
|     174|
+--------+



In [7]:
query1 = '''select count(*) from cases where confirmed > 100'''
cnt = spark.sql(query1)
cnt.show()

+--------+
|count(1)|
+--------+
|      16|
+--------+



In [8]:
query = '''select count(province), city from cases group by city '''
res = spark.sql(query)
res.show()

+---------------+---------------+
|count(province)|           city|
+---------------+---------------+
|              4|     Gangnam-gu|
|              1|     Cheonan-si|
|             51|from other city|
|              2|      Anyang-si|
|              2|      Gwanak-gu|
|              1|     Yongsan-gu|
|              2|        Dong-gu|
|              2|         Sejong|
|              1|     Gangseo-gu|
|              1|       Wonju-si|
|              1|     Suyeong-gu|
|              2|   Geochang-gun|
|              1|  Dongdaemun-gu|
|              1|     Dongnae-gu|
|              1|         Jin-gu|
|              1|     Yangsan-si|
|              1|    Changwon-si|
|              1|         Nam-gu|
|              3|   Gyeongsan-si|
|              2|      Jongno-gu|
+---------------+---------------+
only showing top 20 rows



In [9]:
query = '''select avg(confirmed), city from cases group by city'''
res =  spark.sql(query)
res.show()

+------------------------------+---------------+
|avg(CAST(confirmed AS DOUBLE))|           city|
+------------------------------+---------------+
|                           4.5|     Gangnam-gu|
|                         103.0|     Cheonan-si|
|            23.862745098039216|from other city|
|                          19.5|      Anyang-si|
|                          74.5|      Gwanak-gu|
|                         139.0|     Yongsan-gu|
|                          22.0|        Dong-gu|
|                          19.5|         Sejong|
|                           0.0|     Gangseo-gu|
|                           4.0|       Wonju-si|
|                           5.0|     Suyeong-gu|
|                           9.0|   Geochang-gun|
|                          17.0|  Dongdaemun-gu|
|                          39.0|     Dongnae-gu|
|                           4.0|         Jin-gu|
|                           3.0|     Yangsan-si|
|                           7.0|    Changwon-si|
|                   

In [10]:
query = '''select avg(confirmed), province from cases group by province'''
res =  spark.sql(query)
res.show()

+------------------------------+-----------------+
|avg(CAST(confirmed AS DOUBLE))|         province|
+------------------------------+-----------------+
|             8.166666666666666|           Sejong|
|                         12.75|            Ulsan|
|             8.571428571428571|Chungcheongbuk-do|
|                          7.75|       Gangwon-do|
|                           8.6|          Gwangju|
|            101.84615384615384| Gyeongsangbuk-do|
|                         668.0|            Daegu|
|                          11.0| Gyeongsangnam-do|
|            28.857142857142858|          Incheon|
|                          4.75|          Jeju-do|
|             45.45454545454545|      Gyeonggi-do|
|                          15.6|            Busan|
|                          13.1|          Daejeon|
|             33.68421052631579|            Seoul|
|                         19.75|Chungcheongnam-do|
|                           4.6|     Jeollabuk-do|
|                           5.0

In [11]:
query = '''describe cases'''
spark.sql(query).show()

+--------------+---------+-------+
|      col_name|data_type|comment|
+--------------+---------+-------+
|       case_id|   string|   null|
|      province|   string|   null|
|          city|   string|   null|
|         group|   string|   null|
|infection_case|   string|   null|
|     confirmed|   string|   null|
|      latitude|   string|   null|
|     longitude|   string|   null|
+--------------+---------+-------+



In [12]:
query = '''select max(confirmed), min(confirmed),province from cases group by province'''
spark.sql(query).show()

+--------------+--------------+-----------------+
|max(confirmed)|min(confirmed)|         province|
+--------------+--------------+-----------------+
|             8|             1|           Sejong|
|             7|            16|            Ulsan|
|             9|            11|Chungcheongbuk-do|
|             7|             0|       Gangwon-do|
|             9|             1|          Gwangju|
|            68|            10| Gyeongsangbuk-do|
|           917|           101|            Daegu|
|             9|            10| Gyeongsangnam-do|
|            68|            11|          Incheon|
|             4|             0|          Jeju-do|
|            84|            10|      Gyeonggi-do|
|             6|             1|            Busan|
|             7|            13|          Daejeon|
|            95|             0|            Seoul|
|             9|            10|Chungcheongnam-do|
|             5|             1|     Jeollabuk-do|
|             4|             1|     Jeollanam-do|


In [13]:
query = '''select max(confirmed), min(confirmed),city from cases group by city'''
spark.sql(query).show()

+--------------+--------------+---------------+
|max(confirmed)|min(confirmed)|           city|
+--------------+--------------+---------------+
|             7|             1|     Gangnam-gu|
|           103|           103|     Cheonan-si|
|            22|            17|      Anyang-si|
|             9|             1|from other city|
|            30|           119|      Gwanak-gu|
|           139|           139|     Yongsan-gu|
|             5|            39|        Dong-gu|
|             8|            31|         Sejong|
|             0|             0|     Gangseo-gu|
|             4|             4|       Wonju-si|
|             5|             5|     Suyeong-gu|
|             8|            10|   Geochang-gun|
|            17|            17|  Dongdaemun-gu|
|            39|            39|     Dongnae-gu|
|             4|             4|         Jin-gu|
|             3|             3|     Yangsan-si|
|             7|             7|    Changwon-si|
|          4511|          4511|         

In [14]:
regions = spark.read.csv('Region.csv', header=True)
regions.createOrReplaceTempView('region')
query = '''select * from region limit 5'''
spark.sql(query).show()

+-----+--------+-----------+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
| code|province|       city| latitude| longitude|elementary_school_count|kindergarten_count|university_count|academy_ratio|elderly_population_ratio|elderly_alone_ratio|nursing_home_count|
+-----+--------+-----------+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
|10000|   Seoul|      Seoul|37.566953|126.977977|                    607|               830|              48|         1.44|                   15.38|                5.8|             22739|
|10010|   Seoul| Gangnam-gu|37.518421|127.047222|                     33|                38|               0|         4.18|                   13.17|                4.3|              3088|
|10020|   Seoul|Gangdong-gu|37.530492|127.123837|           

In [15]:
## joining the dataframes 
query = '''select a.province, a.city, a.confirmed, b.elderly_population_ratio from cases a INNER JOIN region b on a.province == b.province'''
final = spark.sql(query)
final.show()

+--------+----------+---------+------------------------+
|province|      city|confirmed|elderly_population_ratio|
+--------+----------+---------+------------------------+
|   Seoul|Yongsan-gu|      139|                   16.65|
|   Seoul|Yongsan-gu|      139|                   18.42|
|   Seoul|Yongsan-gu|      139|                   18.27|
|   Seoul|Yongsan-gu|      139|                      17|
|   Seoul|Yongsan-gu|      139|                   16.87|
|   Seoul|Yongsan-gu|      139|                    15.6|
|   Seoul|Yongsan-gu|      139|                   13.55|
|   Seoul|Yongsan-gu|      139|                    13.1|
|   Seoul|Yongsan-gu|      139|                   16.15|
|   Seoul|Yongsan-gu|      139|                   14.76|
|   Seoul|Yongsan-gu|      139|                   13.39|
|   Seoul|Yongsan-gu|      139|                   16.77|
|   Seoul|Yongsan-gu|      139|                   14.05|
|   Seoul|Yongsan-gu|      139|                   15.85|
|   Seoul|Yongsan-gu|      139|

In [16]:
query = '''select province, city, elderly_population_ratio from region order by elderly_population_ratio desc'''
spark.sql(query).show()

+----------------+--------------+------------------------+
|        province|          city|elderly_population_ratio|
+----------------+--------------+------------------------+
|         Incheon|     Yeonsu-gu|                    9.48|
|          Sejong|        Sejong|                    9.48|
|         Gwangju|   Gwangsan-gu|                     9.1|
|     Gyeonggi-do|       Osan-si|                    9.09|
|Gyeongsangbuk-do|       Gumi-si|                    9.08|
|         Daejeon|    Yuseong-gu|                    9.04|
|     Gyeonggi-do|    Siheung-si|                    8.86|
|     Gyeonggi-do|   Hwaseong-si|                    8.58|
|           Ulsan|        Buk-gu|                    7.69|
|Gyeongsangbuk-do|   Uiseong-gun|                   40.26|
|    Jeollanam-do|   Goheung-gun|                   40.04|
|Gyeongsangbuk-do|     Gunwi-gun|                   38.87|
|Gyeongsangnam-do|  Hapcheon-gun|                   38.44|
|    Jeollanam-do|   Boseong-gun|                   37.4

In [17]:
query = '''select city, avg(elderly_population_ratio) from region group by city having avg(elderly_population_ratio) > 10'''
spark.sql(query).show()

+--------------+---------------------------------------------+
|          city|avg(CAST(elderly_population_ratio AS DOUBLE))|
+--------------+---------------------------------------------+
|       Naju-si|                                        22.16|
|    Gangnam-gu|                                        13.17|
|     Songpa-gu|                                         13.1|
|    Jangsu-gun|                                        32.87|
|  Hapcheon-gun|                                        38.44|
|   Bupyeong-gu|                                        13.83|
|      Gimpo-si|                                         12.1|
|    Cheonan-si|                                        10.42|
|   Michuhol-gu|                                        16.29|
|     Anyang-si|                                        12.88|
|     Gwanak-gu|                                        15.12|
|    Gijang-gun|                                        15.45|
|     Icheon-si|                                       

In [18]:
# cluster by ---> it will sort the data on each partition 
query = '''select city, elderly_population_ratio from region'''
spark.sql(query).show()

+-------------+------------------------+
|         city|elderly_population_ratio|
+-------------+------------------------+
|        Seoul|                   15.38|
|   Gangnam-gu|                   13.17|
|  Gangdong-gu|                   14.55|
|   Gangbuk-gu|                   19.49|
|   Gangseo-gu|                   14.39|
|    Gwanak-gu|                   15.12|
|  Gwangjin-gu|                   13.75|
|      Guro-gu|                   16.21|
| Geumcheon-gu|                   16.15|
|     Nowon-gu|                    15.4|
|    Dobong-gu|                   17.89|
|Dongdaemun-gu|                   17.26|
|   Dongjak-gu|                   15.85|
|      Mapo-gu|                   14.05|
| Seodaemun-gu|                   16.77|
|    Seocho-gu|                   13.39|
| Seongdong-gu|                   14.76|
|  Seongbuk-gu|                   16.15|
|    Songpa-gu|                    13.1|
| Yangcheon-gu|                   13.55|
+-------------+------------------------+
only showing top

In [19]:
query = '''select city, elderly_population_ratio from region cluster by elderly_population_ratio'''
spark.sql(query).show()

+--------------+------------------------+
|          city|elderly_population_ratio|
+--------------+------------------------+
|   Hwaseong-si|                    8.58|
|    Hwasun-gun|                   26.14|
|        Nam-gu|                   11.42|
|  Chuncheon-si|                    17.1|
|    Suseong-gu|                      15|
|      Gimje-si|                   30.89|
|   Seongju-gun|                   30.89|
|Yangpyeong-gun|                   24.66|
|  Goryeong-gun|                   30.17|
|   Goseong-gun|                   30.17|
|   Gangjin-gun|                   33.87|
|        Nam-gu|                   22.49|
|  Gokseong-gun|                   35.16|
| Jangseong-gun|                   28.75|
|   Hamyang-gun|                   32.65|
|    Dongnae-gu|                   17.53|
|         Busan|                   18.41|
|    Namdong-gu|                   12.68|
|     Sangju-si|                   30.09|
|       Dong-gu|                   21.62|
+--------------+------------------

In [20]:
# distributed by ----> it will not sort the data as cluster by but it will cluster the same values together
query = '''select province, city, elderly_alone_ratio  from region distribute by province, city'''
spark.sql(query).show()

+-----------------+---------------+-------------------+
|         province|           city|elderly_alone_ratio|
+-----------------+---------------+-------------------+
|      Gyeonggi-do|       Gimpo-si|                4.4|
|          Gwangju|         Buk-gu|                6.9|
|     Jeollanam-do|       Yeosu-si|                9.5|
| Gyeongsangnam-do|       Jinju-si|                8.6|
|     Jeollanam-do|     Haenam-gun|                 20|
|            Seoul|     Gangnam-gu|                4.3|
|            Seoul|        Guro-gu|                5.7|
| Gyeongsangbuk-do|   Mungyeong-si|               17.1|
|          Incheon|      Yeonsu-gu|                  4|
|Chungcheongbuk-do|Jeungpyeong-gun|                8.7|
|      Gyeonggi-do|       Hanam-si|                4.8|
|       Gangwon-do|       Inje-gun|               10.4|
|     Jeollanam-do|    Goheung-gun|               24.5|
|     Jeollanam-do| Yeonggwang-gun|               17.3|
| Gyeongsangnam-do|Changnyeong-gun|             

In [21]:
df1 = spark.read.csv('student_marks.csv', header=True)
df1.createOrReplaceTempView('marks')
query = '''select * from marks'''
spark.sql(query).show()

+--------+------+----------+-----+-------+---------+-------+-------+---------+-------+------+
|     _c0|Gender|       DOB|Maths|Physics|Chemistry|English|Biology|Economics|History|Civics|
+--------+------+----------+-----+-------+---------+-------+-------+---------+-------+------+
|    John|     M|05/04/1988|   55|     45|       56|     87|     21|       52|     89|    65|
|  Suresh|     M|  4/5/1987|   75|     55|     null|     64|     90|       61|     58|     2|
|  Ramesh|     M| 25/5/1989|   25|     54|       89|     76|     95|       87|     56|    74|
| Jessica|     F| 12/8/1990|   78|     55|       86|     63|     54|       89|     75|    45|
|Jennifer|     F|  2/9/1989|   58|     96|       78|     46|     96|       77|     83|    53|
+--------+------+----------+-----+-------+---------+-------+-------+---------+-------+------+



In [37]:
# # case clause 
# query = '''select _c0, Maths case when Maths > 40 then 'pass' else 'fail' end from marks'''
# spark.sql(query).show()

In [22]:
query  = '''select first(Maths), last(Maths) from marks'''
spark.sql(query).show()

+------------+-----------+
|first(Maths)|last(Maths)|
+------------+-----------+
|          55|         58|
+------------+-----------+



In [23]:
query = '''select province, sum(confirmed) as total_confirmed from cases group by province order by sum(confirmed) desc'''
spark.sql(query).show()

+-----------------+---------------+
|         province|total_confirmed|
+-----------------+---------------+
|            Daegu|         6680.0|
| Gyeongsangbuk-do|         1324.0|
|            Seoul|         1280.0|
|      Gyeonggi-do|         1000.0|
|          Incheon|          202.0|
|Chungcheongnam-do|          158.0|
|            Busan|          156.0|
| Gyeongsangnam-do|          132.0|
|          Daejeon|          131.0|
|       Gangwon-do|           62.0|
|Chungcheongbuk-do|           60.0|
|            Ulsan|           51.0|
|           Sejong|           49.0|
|          Gwangju|           43.0|
|     Jeollanam-do|           25.0|
|     Jeollabuk-do|           23.0|
|          Jeju-do|           19.0|
+-----------------+---------------+



In [24]:
query = '''SELECT /*+ REPARTITION(5) */ * FROM cases'''
spark.sql(query).show(1)

+--------+--------+---------------+-----+--------------------+---------+--------+---------+
| case_id|province|           city|group|      infection_case|confirmed|latitude|longitude|
+--------+--------+---------------+-----+--------------------+---------+--------+---------+
| 1500007| Daejeon|from other city| TRUE|Seosan-si Laboratory|        2|       -|        -|
+--------+--------+---------------+-----+--------------------+---------+--------+---------+
only showing top 1 row



In [25]:
# table sample 

query = '''select * from cases TABLESAMPLE (50 percent)'''
result = spark.sql(query)
result.show(), result.count()

+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| case_id|province|           city|group|      infection_case|confirmed| latitude| longitude|
+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| 1000001|   Seoul|     Yongsan-gu| TRUE|       Itaewon Clubs|      139|37.538621|126.992652|
| 1000003|   Seoul|        Guro-gu| TRUE| Guro-gu Call Center|       95|37.508163|126.884387|
| 1000004|   Seoul|   Yangcheon-gu| TRUE|Yangcheon Table T...|       43|37.546061|126.874209|
| 1000006|   Seoul|        Guro-gu| TRUE|Manmin Central Ch...|       41|37.481059|126.894343|
| 1000010|   Seoul|      Gwanak-gu| TRUE|     Wangsung Church|       30|37.481735|126.930121|
| 1000011|   Seoul|   Eunpyeong-gu| TRUE|Eunpyeong St. Mar...|       14| 37.63369|  126.9165|
| 1000012|   Seoul|   Seongdong-gu| TRUE|    Seongdong-gu APT|       13| 37.55713|  127.0403|
| 1000014|   Seoul|     Gangnam-gu| TRUE|Samsung Medical C..

(None, 85)

In [26]:
query = '''select count(*) from cases'''
spark.sql(query).show()

+--------+
|count(1)|
+--------+
|     174|
+--------+



In [27]:
# Rank 
query = '''select province, city, confirmed, RANK() over (partition by province order by confirmed) as rank from cases'''
spark.sql(query).show()

+-----------------+---------------+---------+----+
|         province|           city|confirmed|rank|
+-----------------+---------------+---------+----+
|           Sejong|from other city|        1|   1|
|           Sejong|              -|        1|   1|
|           Sejong|              -|        3|   3|
|           Sejong|         Sejong|       31|   4|
|           Sejong|              -|        5|   5|
|           Sejong|         Sejong|        8|   6|
|            Ulsan|from other city|       16|   1|
|            Ulsan|              -|       25|   2|
|            Ulsan|              -|        3|   3|
|            Ulsan|              -|        7|   4|
|Chungcheongbuk-do|     Goesan-gun|       11|   1|
|Chungcheongbuk-do|              -|       11|   1|
|Chungcheongbuk-do|              -|       13|   3|
|Chungcheongbuk-do|from other city|        2|   4|
|Chungcheongbuk-do|from other city|        6|   5|
|Chungcheongbuk-do|              -|        8|   6|
|Chungcheongbuk-do|from other c

In [28]:
# Dense rank
query = '''select province, city, confirmed, DENSE_RANK() over (partition by province order by confirmed) as rank from cases'''
spark.sql(query).show()


+-----------------+---------------+---------+----+
|         province|           city|confirmed|rank|
+-----------------+---------------+---------+----+
|           Sejong|from other city|        1|   1|
|           Sejong|              -|        1|   1|
|           Sejong|              -|        3|   2|
|           Sejong|         Sejong|       31|   3|
|           Sejong|              -|        5|   4|
|           Sejong|         Sejong|        8|   5|
|            Ulsan|from other city|       16|   1|
|            Ulsan|              -|       25|   2|
|            Ulsan|              -|        3|   3|
|            Ulsan|              -|        7|   4|
|Chungcheongbuk-do|     Goesan-gun|       11|   1|
|Chungcheongbuk-do|              -|       11|   1|
|Chungcheongbuk-do|              -|       13|   2|
|Chungcheongbuk-do|from other city|        2|   3|
|Chungcheongbuk-do|from other city|        6|   4|
|Chungcheongbuk-do|              -|        8|   5|
|Chungcheongbuk-do|from other c

In [29]:
## Explain 

query = '''Explain EXTENDED select province, city, confirmed, DENSE_RANK() over 
(partition by province order by confirmed) as rank from cases'''
spark.sql(query).collect()


[Row(plan="== Parsed Logical Plan ==\n'Project ['province, 'city, 'confirmed, 'DENSE_RANK() windowspecdefinition('province, 'confirmed ASC NULLS FIRST, unspecifiedframe$()) AS rank#888]\n+- 'UnresolvedRelation [cases], [], false\n\n== Analyzed Logical Plan ==\nprovince: string, city: string, confirmed: string, rank: int\nProject [province#17, city#18, confirmed#21, rank#888]\n+- Project [province#17, city#18, confirmed#21, rank#888, rank#888]\n   +- Window [dense_rank(confirmed#21) windowspecdefinition(province#17, confirmed#21 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#888], [province#17], [confirmed#21 ASC NULLS FIRST]\n      +- Project [province#17, city#18, confirmed#21]\n         +- SubqueryAlias cases\n            +- Relation[ case_id#16,province#17,city#18,group#19,infection_case#20,confirmed#21,latitude#22,longitude#23] csv\n\n== Optimized Logical Plan ==\nWindow [dense_rank(confirmed#21) windowspecdefinition(province#17, conf

In [30]:
from pyspark.sql.functions import udf 
from pyspark.sql.types import *

def toint(x):
    if isinstance(x, str) == True:
        x1 = [str(ord(i)) for i in x]
        return (int(''.join(x)))
    else:
        return Null
to_int = udf(lambda z: toint(z), IntegerType())
spark.udf.register("to_int", to_int)

<function __main__.<lambda>>

In [31]:
query = '''describe cases'''
spark.sql(query).show()

+--------------+---------+-------+
|      col_name|data_type|comment|
+--------------+---------+-------+
|       case_id|   string|   null|
|      province|   string|   null|
|          city|   string|   null|
|         group|   string|   null|
|infection_case|   string|   null|
|     confirmed|   string|   null|
|      latitude|   string|   null|
|     longitude|   string|   null|
+--------------+---------+-------+



In [32]:
query = '''select province, city, to_int(confirmed) from cases'''
res1 = spark.sql(query)
res1.printSchema()

root
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- to_int(confirmed): integer (nullable = true)



In [34]:
# query = '''select dayofyear(DOB) from marks'''
# spark.sql(query).show()

In [37]:
# query = '''select bit_and(to_int(confirmed)) from cases'''
# spark.sql(query).show()

In [41]:
# collect list 
query = '''select collect_list(Maths), collect_set(Physics) from marks'''
spark.sql(query).show()

+--------------------+--------------------+
| collect_list(Maths)|collect_set(Physics)|
+--------------------+--------------------+
|[55, 75, 25, 78, 58]|    [55, 45, 54, 96]|
+--------------------+--------------------+



In [42]:
# count distint
query = '''select count(DISTINCT province) from cases'''
spark.sql(query).show()

+------------------------+
|count(DISTINCT province)|
+------------------------+
|                      17|
+------------------------+



In [43]:
# first_value
query = '''select first_value(Maths), first_value(Maths, true), last_value(Maths), last_value(Maths, true) from marks'''
spark.sql(query).show()

+------------------+------------------+-----------------+-----------------+
|first_value(Maths)|first_value(Maths)|last_value(Maths)|last_value(Maths)|
+------------------+------------------+-----------------+-----------------+
|                55|                55|               58|               58|
+------------------+------------------+-----------------+-----------------+



In [46]:
# varience, varience sample, standard deviation
query = '''select variance(Maths), var_samp(Maths), stddev(Maths) from marks'''
spark.sql(query).show()

+-------------------------------+-------------------------------+-----------------------------+
|variance(CAST(Maths AS DOUBLE))|var_samp(CAST(Maths AS DOUBLE))|stddev(CAST(Maths AS DOUBLE))|
+-------------------------------+-------------------------------+-----------------------------+
|                          446.7|                          446.7|           21.135278564523347|
+-------------------------------+-------------------------------+-----------------------------+

