<a href="https://colab.research.google.com/github/sumaaithal/PySpark_30Days_Challenge/blob/main/pyspark32.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
!pip install -q findspark

Get:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [662 kB]
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:11 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [109 kB]
Get:12 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [1,056 kB]
Get:13 http://archive.ubun

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [25]:
from pyspark.sql.functions import regexp_replace, when, translate, expr, overlay

In [4]:
address = [(1,"14851 Jeffrey Rd","DE"),
    (2,"43421 Margarita St","NY"),
    (3,"13111 Siemon Ave","CA")]

In [5]:
df = spark.createDataFrame(data=address,schema=["id","address","state"])

In [6]:
df.show()

+---+------------------+-----+
| id|           address|state|
+---+------------------+-----+
|  1|  14851 Jeffrey Rd|   DE|
|  2|43421 Margarita St|   NY|
|  3|  13111 Siemon Ave|   CA|
+---+------------------+-----+



In [8]:
df.withColumn('address',
              regexp_replace("address",'Rd','Road')).show()

+---+------------------+-----+
| id|           address|state|
+---+------------------+-----+
|  1|14851 Jeffrey Road|   DE|
|  2|43421 Margarita St|   NY|
|  3|  13111 Siemon Ave|   CA|
+---+------------------+-----+



In [12]:
df.withColumn("address",
              when(df.address.endswith("Rd"), regexp_replace("address","Rd","Road"))\
              .when(df.address.endswith("St"), regexp_replace("address","St","Street"))\
              .when(df.address.endswith("Ave"), regexp_replace("address","Ave","Avenue"))
              .otherwise(df.address)
              ).show()

+---+--------------------+-----+
| id|             address|state|
+---+--------------------+-----+
|  1|  14851 Jeffrey Road|   DE|
|  2|43421 Margarita S...|   NY|
|  3| 13111 Siemon Avenue|   CA|
+---+--------------------+-----+



In [14]:
stateDic = {'CA':'California','NY':'New York','DE':'Delaware'}

In [18]:
df2 = df.rdd.map(lambda x:(x.id, x.address, stateDic[x.state])).toDF(["id","addres","state"])

In [19]:
df2.show()

+---+------------------+----------+
| id|            addres|     state|
+---+------------------+----------+
|  1|  14851 Jeffrey Rd|  Delaware|
|  2|43421 Margarita St|  New York|
|  3|  13111 Siemon Ave|California|
+---+------------------+----------+



In [21]:
df.withColumn("address",
              translate("address","123","ABC")).show()

+---+------------------+-----+
| id|           address|state|
+---+------------------+-----+
|  1|  A485A Jeffrey Rd|   DE|
|  2|4C4BA Margarita St|   NY|
|  3|  ACAAA Siemon Ave|   CA|
+---+------------------+-----+



In [23]:
df = spark.createDataFrame(
   [("ABCDE_XYZ", "XYZ","FGH")],
    ("col1", "col2","col3")
  )

In [24]:
df.withColumn("new_column",
              expr("regexp_replace(col1,col2,col3)").alias("replaced_value")).show()

+---------+----+----+----------+
|     col1|col2|col3|new_column|
+---------+----+----+----------+
|ABCDE_XYZ| XYZ| FGH| ABCDE_FGH|
+---------+----+----+----------+



In [26]:
df = spark.createDataFrame([("ABCDE_XYZ", "FGH")], ("col1", "col2"))

In [27]:
df.select(overlay("col1","col2",7).alias("overlayed")).show()

+---------+
|overlayed|
+---------+
|ABCDE_FGH|
+---------+

