<a href="https://colab.research.google.com/github/rizqinugroho/learning-hadoop/blob/main/learning-spark/DataAnalytic.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Spark ditulis dalam bahasa Scala dan membutuhkan Java Virtual Machine (JVM) untuk bisa berjalan. Maka dari itu, kita membutuhkan java untuk dia bisa berjalan diatasnya

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [4]:
!wget -q https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz
!tar xf spark-3.2.1-bin-hadoop2.7.tgz

set the ‘environment’ path.

In [65]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Lalu, kita. butuh menginstall **Findspark** library yang akan mencari spark dalam sistem dan akan menginstallnya sebagai regular library.  

In [6]:
!pip install -q findspark
import findspark

#findspark.init({spark_home})
findspark.init("/content/spark-3.2.1-bin-hadoop2.7/")

Sekarang, kita bisa import SparkSession dari pyspark.sql and membuat  SparkSession, yang merupakan pintu gerbang to Spark.

In [7]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

**StringType**

In [20]:
from pyspark.sql.types import StringType
strType = StringType()
strType = "hello bro"
print(strType)

hello bro


**Membuat PySpark ArrayType Column menggunakan StructType**

In [9]:
data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"],"OH","CA"),
 ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"],"NY","NJ"),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"],"UT","NV")
]

from pyspark.sql.types import StringType, ArrayType,StructType,StructField
schema = StructType([ 
    StructField("name",StringType(),True), 
    StructField("languagesAtSchool",ArrayType(StringType()),True), 
    StructField("languagesAtWork",ArrayType(StringType()),True), 
    StructField("currentState", StringType(), True), 
    StructField("previousState", StringType(), True)
  ])

df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show()

root
 |-- name: string (nullable = true)
 |-- languagesAtSchool: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- languagesAtWork: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- currentState: string (nullable = true)
 |-- previousState: string (nullable = true)

+----------------+------------------+---------------+------------+-------------+
|            name| languagesAtSchool|languagesAtWork|currentState|previousState|
+----------------+------------------+---------------+------------+-------------+
|    James,,Smith|[Java, Scala, C++]|  [Spark, Java]|          OH|           CA|
|   Michael,Rose,|[Spark, Java, C++]|  [Spark, Java]|          NY|           NJ|
|Robert,,Williams|      [CSharp, VB]|[Spark, Python]|          UT|           NV|
+----------------+------------------+---------------+------------+-------------+



**Funsi Explode()**

In [10]:
from pyspark.sql.functions import explode
df.select(df.name,explode(df.languagesAtSchool)).show()

+----------------+------+
|            name|   col|
+----------------+------+
|    James,,Smith|  Java|
|    James,,Smith| Scala|
|    James,,Smith|   C++|
|   Michael,Rose,| Spark|
|   Michael,Rose,|  Java|
|   Michael,Rose,|   C++|
|Robert,,Williams|CSharp|
|Robert,,Williams|    VB|
+----------------+------+



**Split() function**

In [12]:
from pyspark.sql.functions import split
df.select(split(df.name,",").alias("nameAsArray")).show()
df.select(split(df.name,",").alias("nameAsArray")).printSchema()

+--------------------+
|         nameAsArray|
+--------------------+
|    [James, , Smith]|
|   [Michael, Rose, ]|
|[Robert, , Williams]|
+--------------------+

root
 |-- nameAsArray: array (nullable = true)
 |    |-- element: string (containsNull = true)



**Fungsi Array**

In [13]:
from pyspark.sql.functions import array
df.select(df.name,array(df.currentState,df.previousState).alias("States")).show()


+----------------+--------+
|            name|  States|
+----------------+--------+
|    James,,Smith|[OH, CA]|
|   Michael,Rose,|[NY, NJ]|
|Robert,,Williams|[UT, NV]|
+----------------+--------+



**Array Contains**

In [14]:
from pyspark.sql.functions import array_contains
df.select(df.name,array_contains(df.languagesAtSchool,"Java")
    .alias("array_contains")).show()

+----------------+--------------+
|            name|array_contains|
+----------------+--------------+
|    James,,Smith|          true|
|   Michael,Rose,|          true|
|Robert,,Williams|         false|
+----------------+--------------+



In [None]:
df.select("title", "price", "year_written").show(5)

+----------------+-----+------------+
|           title|price|year_written|
+----------------+-----+------------+
|Northanger Abbey| 18.2|        1814|
|   War and Peace| 12.7|        1865|
|   Anna Karenina| 13.5|        1875|
|   Mrs. Dalloway| 25.0|        1925|
|       The Hours|12.35|        1999|
+----------------+-----+------------+
only showing top 5 rows



**MapType()**

In [21]:

from pyspark.sql.types import StructField, StructType, StringType, MapType
schema = StructType([
    StructField('name', StringType(), True),
    StructField('properties', MapType(StringType(),StringType()),True)
])

dataDictionary = [
        ('James',{'hair':'black','eye':'brown'}),
        ('Michael',{'hair':'brown','eye':None}),
        ('Robert',{'hair':'red','eye':'black'}),
        ('Washington',{'hair':'grey','eye':'grey'}),
        ('Jefferson',{'hair':'brown','eye':''})
        ]
df = spark.createDataFrame(data=dataDictionary, schema = schema)
df.printSchema()
df.show(truncate=False)


root
 |-- name: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+----------+-----------------------------+
|name      |properties                   |
+----------+-----------------------------+
|James     |{eye -> brown, hair -> black}|
|Michael   |{eye -> null, hair -> brown} |
|Robert    |{eye -> black, hair -> red}  |
|Washington|{eye -> grey, hair -> grey}  |
|Jefferson |{eye -> , hair -> brown}     |
+----------+-----------------------------+



**Akses Element Map Type**

In [22]:
df3=df.rdd.map(lambda x: \
    (x.name,x.properties["hair"],x.properties["eye"])) \
    .toDF(["name","hair","eye"])
df3.printSchema()
df3.show()

root
 |-- name: string (nullable = true)
 |-- hair: string (nullable = true)
 |-- eye: string (nullable = true)

+----------+-----+-----+
|      name| hair|  eye|
+----------+-----+-----+
|     James|black|brown|
|   Michael|brown| null|
|    Robert|  red|black|
|Washington| grey| grey|
| Jefferson|brown|     |
+----------+-----+-----+



Mari kita gunakan cara lain untuk mendapatkan nilai kunci dari Peta menggunakan getItem()tipe Column, metode ini mengambil kunci sebagai argumen dan mengembalikan nilai.

In [23]:
df.withColumn("hair",df.properties.getItem("hair")) \
  .withColumn("eye",df.properties.getItem("eye")) \
  .drop("properties") \
  .show()

+----------+-----+-----+
|      name| hair|  eye|
+----------+-----+-----+
|     James|black|brown|
|   Michael|brown| null|
|    Robert|  red|black|
|Washington| grey| grey|
| Jefferson|brown|     |
+----------+-----+-----+



**maptype explode()**

In [24]:
from pyspark.sql.functions import explode
df.select(df.name,explode(df.properties)).show()

+----------+----+-----+
|      name| key|value|
+----------+----+-----+
|     James| eye|brown|
|     James|hair|black|
|   Michael| eye| null|
|   Michael|hair|brown|
|    Robert| eye|black|
|    Robert|hair|  red|
|Washington| eye| grey|
|Washington|hair| grey|
| Jefferson| eye|     |
| Jefferson|hair|brown|
+----------+----+-----+



**map_keys()**

In [25]:
from pyspark.sql.functions import map_keys
df.select(df.name,map_keys(df.properties)).show()

+----------+--------------------+
|      name|map_keys(properties)|
+----------+--------------------+
|     James|         [eye, hair]|
|   Michael|         [eye, hair]|
|    Robert|         [eye, hair]|
|Washington|         [eye, hair]|
| Jefferson|         [eye, hair]|
+----------+--------------------+



**map_values()**

In [26]:

from pyspark.sql.functions import map_values
df.select(df.name,map_values(df.properties)).show()

+----------+----------------------+
|      name|map_values(properties)|
+----------+----------------------+
|     James|        [brown, black]|
|   Michael|         [null, brown]|
|    Robert|          [black, red]|
|Washington|          [grey, grey]|
| Jefferson|             [, brown]|
+----------+----------------------+



**datetype()**

In [32]:
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType



# Creation of a dummy dataframe:
df1 = spark.createDataFrame([("1991-11-20","1991-11-20","1991-11-20"), 
                            ("1992-11-20","1992-11-20","1993-11-20")], schema=['first', 'second', 'third'])

#using cast to convert from string to datetype
df = df1.withColumn('test', col('first').cast(DateType()))

df.show()
df.printSchema()

+----------+----------+----------+----------+
|     first|    second|     third|      test|
+----------+----------+----------+----------+
|1991-11-20|1991-11-20|1991-11-20|1991-11-20|
|1992-11-20|1992-11-20|1993-11-20|1992-11-20|
+----------+----------+----------+----------+

root
 |-- first: string (nullable = true)
 |-- second: string (nullable = true)
 |-- third: string (nullable = true)
 |-- test: date (nullable = true)



**Timestamp**

In [46]:
from datetime import datetime
from pyspark.sql.functions import col, to_timestamp
df = spark.createDataFrame(
    [("1997-02-28 10:30:00",), ("1997-02-28 10:33:00",), ("1997-02-28 10:35:00",)], 
    ['date_str'])

converted_df = df.select(to_timestamp(df.date_str, 'yyyy-MM-dd HH:mm:ss')).alias('dt_col')


converted_df.show()
converted_df.printSchema()

+-------------------------------------------+
|to_timestamp(date_str, yyyy-MM-dd HH:mm:ss)|
+-------------------------------------------+
|                        1997-02-28 10:30:00|
|                        1997-02-28 10:33:00|
|                        1997-02-28 10:35:00|
+-------------------------------------------+

root
 |-- to_timestamp(date_str, yyyy-MM-dd HH:mm:ss): timestamp (nullable = true)



**Boolean**

In [80]:
# list  of students  data
data = [["1", "Ryan", "DU"],
        ["2", "Fina", "DU"],
        ["3", "Imam", "BHU"],
        ["4", "Bagas", "LPU"],
        ["1", "Budi", "KLMP"],
        ["5", "Ani", "IIT"]]
  
# specify column names
columns = ['student_ID', 'student_NAME', 'college']
  
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data, columns)

#and using &
dataframe.filter((dataframe.college == "DU") &
                 (dataframe.student_ID == "1")).show()
#or using |
dataframe.filter((dataframe.college == "DU") |
                 (dataframe.student_ID == "1")).show()

+----------+------------+-------+
|student_ID|student_NAME|college|
+----------+------------+-------+
|         1|        Ryan|     DU|
+----------+------------+-------+

+----------+------------+-------+
|student_ID|student_NAME|college|
+----------+------------+-------+
|         1|        Ryan|     DU|
|         2|        Fina|     DU|
|         1|        Budi|   KLMP|
+----------+------------+-------+



**Number**

In [58]:
df = spark.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "Number"])
sum_value = df.groupBy().sum()

sum_value.show()
sum_value.printSchema()

+-----------+
|sum(Number)|
+-----------+
|        130|
+-----------+

root
 |-- sum(Number): long (nullable = true)



**Aggregate - Group By**

In [64]:
simpleData = [("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
  ]
schema = ["employee_name", "department", "salary"]
df = spark.createDataFrame(data=simpleData, schema = schema)

#group by
df_group_by = df.groupby('department')

#max salary
print("====Max salary ===")
df_group_by.max().show()

#sum salary
print("====sum salary ===")
df_group_by.sum().show()


#avg salary
print("====avg salary ===")
df_group_by.avg().show()

====Max salary ===
+----------+-----------+
|department|max(salary)|
+----------+-----------+
|     Sales|       4600|
|   Finance|       3900|
| Marketing|       3000|
+----------+-----------+

====sum salary ===
+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|     Sales|      18800|
|   Finance|      10200|
| Marketing|       5000|
+----------+-----------+

====avg salary ===
+----------+-----------+
|department|avg(salary)|
+----------+-----------+
|     Sales|     3760.0|
|   Finance|     3400.0|
| Marketing|     2500.0|
+----------+-----------+



**JOIN Statement**

In [79]:
emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

#Inner Join 
print("===Inner Join Example ===")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner") \
     .show(truncate=False)

#Full Outer Join
print("===Full Outer Join Example ===")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"fullouter") \
    .show(truncate=False)

#Left Outer Join
print("===Left Outer Join Example ===")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left").show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftouter").show(truncate=False)

#Right Outer Join
print("===Right Outer Join Example ===")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right").show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"rightouter").show(truncate=False)






root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+-----