<a href="https://colab.research.google.com/github/sabbah128/PySpark_Colab/blob/main/PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=6fd5349b9a9a29773d1e990950057d10640902c6ce5afedf26e2931d756e4bdd
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


## Count Words

In [None]:
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("Kian_Spark").getOrCreate()
spark

In [None]:
dir(spark.read)

In [None]:
txt = spark.read.text('/content/test_pyspark.txt')
txt.printSchema()
txt.show(5, truncate=50)

root
 |-- value: string (nullable = true)

+--------------------------------------------------+
|                                             value|
+--------------------------------------------------+
|Machine learning diagnosis of active Juvenile I...|
|                                                  |
|                                                  |
|                                                  |
|                                   ARTICLEINFO\t\t|
+--------------------------------------------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import split, col


lines_split = txt.select(split(col('value'), " ").alias("split_value"))
lines_split.printSchema()
lines_split.show(10, truncate=70)

root
 |-- split_value: array (nullable = true)
 |    |-- element: string (containsNull = false)

+----------------------------------------------------------------------+
|                                                           split_value|
+----------------------------------------------------------------------+
|[Machine, learning, diagnosis, of, active, Juvenile, Idiopathic, Ar...|
|                                                                    []|
|                                                                    []|
|                                                                    []|
|                                                     [ARTICLEINFO\t\t]|
|                                                            [ABSTRACT]|
|                                                                    []|
|                                                           [Keywords:]|
|                                    [Convolutional, Neural, Network, ]|
|                          

In [None]:
# from pyspark.sql.functions import col, size


# (lines_split.select(size(col("split_value"))
#                     .alias("length"))
#                     .groupby("length")
#                     .count()).orderBy(col("count").desc()).show()

In [None]:
import  pyspark.sql.functions as F


explode_words = lines_split.select(F.explode(F.col("split_value")).alias("word"))
explode_words.printSchema()
explode_words.show(20)

root
 |-- word: string (nullable = false)

+---------------+
|           word|
+---------------+
|        Machine|
|       learning|
|      diagnosis|
|             of|
|         active|
|       Juvenile|
|     Idiopathic|
|      Arthritis|
|             on|
|          blood|
|           pool|
|           [99M|
|            Tc]|
|         Tc-MDP|
|   scintigraphy|
|        images.|
|               |
|               |
|               |
|ARTICLEINFO\t\t|
+---------------+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import lower, col


word_lowers = explode_words.select(lower(col("word")).alias("word_lower")).filter(col("word_lower") != "")
word_lowers.show()

+---------------+
|     word_lower|
+---------------+
|        machine|
|       learning|
|      diagnosis|
|             of|
|         active|
|       juvenile|
|     idiopathic|
|      arthritis|
|             on|
|          blood|
|           pool|
|           [99m|
|            tc]|
|         tc-mdp|
|   scintigraphy|
|        images.|
|articleinfo\t\t|
|       abstract|
|      keywords:|
|  convolutional|
+---------------+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import regexp_extract, col


words_regexp = word_lowers.select(regexp_extract(col("word_lower"), "[a-z0-9_-]+", 0).alias("word_regexp"))
words_regexp.printSchema()
words_regexp.show(20)

root
 |-- word_regexp: string (nullable = false)

+-------------+
|  word_regexp|
+-------------+
|      machine|
|     learning|
|    diagnosis|
|           of|
|       active|
|     juvenile|
|   idiopathic|
|    arthritis|
|           on|
|        blood|
|         pool|
|          99m|
|           tc|
|       tc-mdp|
| scintigraphy|
|       images|
|  articleinfo|
|     abstract|
|     keywords|
|convolutional|
+-------------+
only showing top 20 rows



In [None]:
results = (words_regexp.groupby(col("word_regexp")).count()).orderBy(col("count").desc())
# results = results.orderBy(col("count").desc())
results.write.csv('simple_count.csv', header=True)
results.show(10)

+-----------+-----+
|word_regexp|count|
+-----------+-----+
|        and|  100|
|        the|   94|
|         of|   77|
|         in|   75|
|         to|   34|
|        for|   32|
|      model|   24|
|          a|   24|
|     images|   22|
|      ankle|   21|
+-----------+-----+
only showing top 10 rows



In [None]:
num_count = results.groupBy(col('count')).count().toDF(*["num_word", "count"]).orderBy(col('count').desc())

num_count.write.csv('renamed_num_count.csv', header=True)
num_count.show(10)

+--------+-----+
|num_word|count|
+--------+-----+
|       1|  569|
|       2|  116|
|       3|   57|
|       4|   26|
|       5|   21|
|       6|   15|
|       8|   14|
|       7|   14|
|      10|    7|
|      11|    5|
+--------+-----+
only showing top 10 rows



## Count words all together

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F


spark = SparkSession.builder.appName("Kian_Spark").getOrCreate()

results = (
    spark.read.text("/content/test_pyspark.txt")
    .select(F.explode(F.split(F.lower(F.col("value")), " ")).alias("word"))
    .select(F.regexp_replace(F.col("word"), "[^a-z_-]", "").alias("word"))
    .where(F.col("word") != "")
    .groupby("word")
    .count()
    .orderBy(F.col("count").desc())
    .write.csv('word_coun1000t.csv', header=True)
)

## Exercises

In [None]:
from pyspark.sql.functions import col, length


# The `length` function returns the number of characters in a string column.
exo2_3_df = (spark.read.text("/content/README.md")
                            .select(length(col("value")).alias("number_of_char"))).show(10)
                            # .select(col("value").alias("number_of_char")))
                            # .withColumnRenamed("length(value)", "number_of_char"))


+--------------+
|number_of_char|
+--------------+
|            14|
|             0|
|            80|
|            75|
|            73|
|            74|
|            98|
|            47|
|             0|
|            27|
+--------------+
only showing top 10 rows



In [None]:
from pyspark.sql.functions import col, greatest
from pyspark.errors import AnalysisException


exo2_4_df = spark.createDataFrame([["key", 10_000, 20_000]], ["key", "value1", "value2"])
exo2_4_df.printSchema()

try:
    exo2_4_mod = exo2_4_df.select(greatest(col("value1"), col("value2")).alias("max_value")).show()
except AnalysisException as err:
    print('Exception Error!')

root
 |-- key: string (nullable = true)
 |-- value1: long (nullable = true)
 |-- value2: long (nullable = true)

+---------+
|max_value|
+---------+
|    20000|
+---------+



In [None]:
# Learn coding
LETTERS = slice (0, 2)
NUMS = slice (2, 6)
CITY = slice (6, None)

code_1 = 'LH1234 BLN'
code_2 = 'LH5678 MUC'

print(code_1[LETTERS], code_1[NUMS], code_1[CITY])
print(code_2[LETTERS], code_2[NUMS], code_2[CITY])

LH 1234  BLN
LH 5678  MUC


## Tabular Data

In [None]:
!wget https://applications.crtc.gc.ca/OpenData/Television%20Logs/BroadcastLogs_2017_Q4_M10.CSV

--2024-03-23 10:45:40--  https://applications.crtc.gc.ca/OpenData/Television%20Logs/BroadcastLogs_2017_Q4_M10.CSV
Resolving applications.crtc.gc.ca (applications.crtc.gc.ca)... 198.103.61.42
Connecting to applications.crtc.gc.ca (applications.crtc.gc.ca)|198.103.61.42|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1037439024 (989M) [application/octet-stream]
Saving to: ‘BroadcastLogs_2017_Q4_M10.CSV’


2024-03-23 10:46:14 (30.1 MB/s) - ‘BroadcastLogs_2017_Q4_M10.CSV’ saved [1037439024/1037439024]



In [None]:
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("Kian_Spark").getOrCreate()
spark

In [None]:
logs = spark.read.csv(('/content/BroadcastLogs_2017_Q4_M10.CSV'),
sep="|",
header=True,
inferSchema=True,
timestampFormat="yyyy-MM-dd",
)

logs.printSchema()
logs.show(5)

root
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable 

In [None]:
logs.select("BroadcastLogID", "LogServiceID", "LogDate", "Duration").show(5, False)
# logs.select(*[F.col("BroadCastLogID"), F.col("LogServiceID"), F.col("LogDate")])

+--------------+------------+----------+----------------+
|BroadcastLogID|LogServiceID|LogDate   |Duration        |
+--------------+------------+----------+----------------+
|989604515     |3157        |2017-10-01|02:00:02.0000000|
|989604516     |3157        |2017-10-01|00:00:30.0000000|
|989604517     |3157        |2017-10-01|00:00:15.0000000|
|989604518     |3157        |2017-10-01|00:00:15.0000000|
|989604519     |3157        |2017-10-01|00:00:15.0000000|
+--------------+------------+----------+----------------+
only showing top 5 rows



In [None]:
# import numpy as np


# column_split = np.array_split(np.array(logs.columns), len(logs.columns) // 3)

# for x in column_split:
#     logs.select(*x).show(5, False)

In [None]:
import pyspark.sql.functions as F


logs.select(
    F.col("Duration"),
    F.col("Duration").substr(1, 2).cast("int").alias("dur_hours"),
    F.col("Duration").substr(4, 2).cast("int").alias("dur_minutes"),
    F.col("Duration").substr(7, 2).cast("int").alias("dur_seconds"),
# ).show(5)
).distinct().show(5)

+----------------+---------+-----------+-----------+
|        Duration|dur_hours|dur_minutes|dur_seconds|
+----------------+---------+-----------+-----------+
|00:08:18.0000000|        0|          8|         18|
|00:10:37.0000000|        0|         10|         37|
|01:09:08.0000000|        1|          9|          8|
|00:26:41.0000000|        0|         26|         41|
|00:05:29.0000000|        0|          5|         29|
+----------------+---------+-----------+-----------+
only showing top 5 rows



In [None]:
logs.select(
    F.col("Duration"),
    (
        F.col("Duration").substr(1, 2).cast("int") * 60 * 60
        + F.col("Duration").substr(4, 2).cast("int") * 60
        + F.col("Duration").substr(7, 2).cast("int")
    ).alias("Duration_seconds"),
            ).distinct().show(5)

+----------------+----------------+
|        Duration|Duration_seconds|
+----------------+----------------+
|00:53:55.0000000|            3235|
|00:10:30.0000000|             630|
|00:59:08.0000000|            3548|
|00:25:52.0000000|            1552|
|00:28:08.0000000|            1688|
+----------------+----------------+
only showing top 5 rows



In [None]:
# WARNING: If you create a column withColumn() and give it a name that
# already exists in your data frame, PySpark will happily overwrite the column.

logs_seconds = logs.withColumn(
    "Duration_seconds",
     (
        F.col("Duration").substr(1, 2).cast("int") * 60 * 60
        + F.col("Duration").substr(4, 2).cast("int") * 60
        + F.col("Duration").substr(7, 2).cast("int")
     ),
)

# logs = logs.withColumnRenamed("Duration_seconds", "duration_seconds")
# logs_seconds.toDF(*[x.lower() for x in logs_seconds.columns]).printSchema()

logs_seconds = logs_seconds.select(sorted(logs_seconds.columns))
logs_seconds.printSchema()
logs_seconds.show(5)

root
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- BroadcastLogID: integer (nullable = true)
 |-- BroadcastOriginPointID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CompositionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Duration_seconds: integer (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- Language1: integer (nullable = true)
 |-- Language2: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- Producer1:

In [None]:
logs_seconds.select("Duration_seconds").summary().show()

+-------+-----------------+
|summary| Duration_seconds|
+-------+-----------------+
|  count|          7096090|
|   mean|133.3823284372098|
| stddev|631.7670380125005|
|    min|                0|
|    25%|               15|
|    50%|               30|
|    75%|               30|
|    max|            86399|
+-------+-----------------+



## Join Tabels

In [None]:
print("""
[LEFT].join(
        [RIGHT],
        on=[PREDICATES]
        how=[METHOD]
)
""")


[LEFT].join(
        [RIGHT],
        on=[PREDICATES]
        how=[METHOD]
)



In [3]:
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("Kian_Spark").getOrCreate()

SalesCustomers = spark.read.csv('/content/SalesCustomers.csv',
                                header=True,
                                inferSchema= True,
                                timestampFormat="yyyy-MM-dd",)
SalesCustomers.printSchema()
SalesCustomers.show(5)

SalesOrders = spark.read.csv('/content/SalesOrders.csv',
                             header=True,
                             inferSchema= True,
                             timestampFormat="yyyy-MM-dd",)
SalesOrders.printSchema()
SalesOrders.show(5)

root
 |-- custid: integer (nullable = true)
 |-- companyname: string (nullable = true)
 |-- contactname: string (nullable = true)
 |-- contacttitle: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- region: string (nullable = true)
 |-- postalcode: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- fax: string (nullable = true)

+------+--------------+-----------------+--------------------+--------------------+-----------+------+----------+-------+--------------+--------------+
|custid|   companyname|      contactname|        contacttitle|             address|       city|region|postalcode|country|         phone|           fax|
+------+--------------+-----------------+--------------------+--------------------+-----------+------+----------+-------+--------------+--------------+
|     1|Customer NRZBB|   Allen, Michael|Sales Representative|     Obere Str. 0123|     Berlin|  NULL|

In [6]:
JoinOrderCustomer = SalesOrders.join(
        SalesCustomers,
        on= 'custid',
        how='inner'
)

JoinOrderCustomer.printSchema()
JoinOrderCustomer.show(5)

root
 |-- custid: integer (nullable = true)
 |-- orderid: integer (nullable = true)
 |-- empid: integer (nullable = true)
 |-- orderdate: string (nullable = true)
 |-- requireddate: string (nullable = true)
 |-- shippeddate: string (nullable = true)
 |-- shipperid: integer (nullable = true)
 |-- freight: double (nullable = true)
 |-- shipname: string (nullable = true)
 |-- shipaddress: string (nullable = true)
 |-- shipcity: string (nullable = true)
 |-- shipregion: string (nullable = true)
 |-- shippostalcode: integer (nullable = true)
 |-- shipcountry: string (nullable = true)
 |-- companyname: string (nullable = true)
 |-- contactname: string (nullable = true)
 |-- contacttitle: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- region: string (nullable = true)
 |-- postalcode: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- fax: string (nullable = true)

+------+-------+

In [28]:
import pyspark.sql.functions as F


splitOrderCustomer = JoinOrderCustomer.withColumn(
    'Fname', F.split('contactname', ', ')[0]).withColumn(
        'Lname', F.split('contactname', ', ')[1]).withColumn(
                'company', F.split('companyname', ' ')[1]).withColumn(
                    'year', F.year(F.to_date('orderdate'))).withColumn(
                        'month', F.month(F.to_date('orderdate')))

splitOrderCustomer.show(5)

+------+-------+-----+--------------------+--------------------+--------------------+---------+-------+-----------------+--------------------+--------------+----------+--------------+-----------+--------------+--------------------+------------------+--------------------+--------------+------+----------+-------+-----------------+-----------------+--------------+---------+-------+----+-----+
|custid|orderid|empid|           orderdate|        requireddate|         shippeddate|shipperid|freight|         shipname|         shipaddress|      shipcity|shipregion|shippostalcode|shipcountry|   companyname|         contactname|      contacttitle|             address|          city|region|postalcode|country|            phone|              fax|         Fname|    Lname|company|year|month|
+------+-------+-----+--------------------+--------------------+--------------------+---------+-------+-----------------+--------------------+--------------+----------+--------------+-----------+--------------+----

In [31]:
selectData = splitOrderCustomer.select("Fname", "Lname", "phone", "country", "city", "freight", "company", "year", "month")
selectData.printSchema()
selectData.show(5)

root
 |-- Fname: string (nullable = true)
 |-- Lname: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- freight: double (nullable = true)
 |-- company: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

+--------------+---------+-----------------+-------+--------------+-------+-------+----+-----+
|         Fname|    Lname|            phone|country|          city|freight|company|year|month|
+--------------+---------+-----------------+-------+--------------+-------+-------+----+-----+
|         McLin|   Nkenge|      56.78.90.12| France|         Reims|  32.38|  ENQZT|2006|    7|
|       Wickham|      Jim|      0251-456789|Germany|       Münster|  11.61|  FAPSM|2006|    7|
|         Cohen|      Shy|    (21) 789-0123| Brazil|Rio de Janeiro|  65.83|  IBVRG|2006|    7|
|Tuntisangaroon|Sittichai|      78.90.12.34| France|          Lyon|  41.34|  NRCSK|20