In [None]:
! apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q https://archive.apache.org/dist/spark/spark-3.4.2/spark-3.4.2-bin-hadoop3.tgz

In [None]:
!tar xf spark-3.4.2-bin-hadoop3.tgz

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.2-bin-hadoop3"

In [None]:
!pip install -q findspark

In [None]:
!pip install -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
!pip install findspark
import findspark
findspark.init()



In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [None]:
# creación de la Session SparkSession
spark= SparkSession.builder.getOrCreate()

In [None]:
# Creo el DF
emp = [(1, "AAA", "dept1", 1000),
    (2, "BBB", "dept1", 1100),
    (3, "CCC", "dept1", 3000),
    (4, "DDD", "dept1", 1500),
    (5, "EEE", "dept2", 8000),
    (6, "FFF", "dept2", 7200),
    (7, "GGG", "dept3", 7100),
    (None, None, None, 7500),
    (9, "III", None, 4500),
    (10, None, "dept5", 2500)]

dept = [("dept1", "Department - 1"),
        ("dept2", "Department - 2"),
        ("dept3", "Department - 3"),
        ("dept4", "Department - 4")
       ]

df = spark.createDataFrame(emp, ["id", "name", "dept", "salary"])
deptdf = spark.createDataFrame(dept, ["id", "name"])

# Creo tablas temporales con esos df
df.createOrReplaceTempView("empdf")
deptdf.createOrReplaceTempView("deptdf")

# Guardar con tablas Hive x si apagarmos session poder recuperarla.
df.write.saveAsTable("hive_empdf", mode = "overwrite")
deptdf.write.saveAsTable("hive_deptdf", mode = "overwrite")

In [None]:
# Funcion BroadCast Join
size_str = spark.conf.get("spark.sql.autoBroadcastJoinThreshold").rstrip('b')

# Convertir el valor en entero y luego a MB
size = int(size_str) / (1024 * 1024)

print("Default size of broadcast table is {0} MB.".format(size))


Default size of broadcast table is 10.0 MB.


In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024)

In [None]:
# Almacenamiento en caché
df.cache()
df.count()
print("Memory Used : {0}".format(df.storageLevel.useMemory))
print("Disk Used : {0}".format(df.storageLevel.useDisk))

Memory Used : True
Disk Used : True


In [None]:
from pyspark.storagelevel import StorageLevel

In [None]:
deptdf.persist(StorageLevel.MEMORY_ONLY)
deptdf.count()
print("Memory Used : {0}".format(df.storageLevel.useMemory))
print("Disk Used : {0}".format(df.storageLevel.useDisk))


Memory Used : True
Disk Used : True


In [None]:
df.unpersist()

DataFrame[id: bigint, name: string, dept: string, salary: bigint]

In [None]:
condicion = """case when salary > 5000 then 'high_salary'
               else case when salary > 2000 then 'mid_salary'
                    else case when salary > 0 then 'low_salary'
                         else 'invalid_salary'
                              end
                         end
                end as salary_level"""

newdf = df.withColumn("salary_level", expr(condicion))
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|null|null| null|  7500| high_salary|
|   9| III| null|  4500|  mid_salary|
|  10|null|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



In [None]:
# Usando la funcion selectExp
newdf = df.selectExpr("*", condicion)
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|null|null| null|  7500| high_salary|
|   9| III| null|  4500|  mid_salary|
|  10|null|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



In [None]:
def detSalary_Level(sal):
    level = None

    if(sal > 5000):
        level = 'high_salary'
    elif(sal > 2000):
        level = 'mid_salary'
    elif(sal > 0):
        level = 'low_salary'
    else:
        level = 'invalid_salary'
    return level

In [None]:
#Luego registre la función "detSalary_Level" como UDF
sal_level = udf(detSalary_Level, StringType())

In [None]:
#Aplicar función para determinar el salario_level para un salario dado.
newdf = df.withColumn("salary_level", sal_level("salary"))
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|null|null| null|  7500| high_salary|
|   9| III| null|  4500|  mid_salary|
|  10|null|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



In [None]:
# Trabajando con valores NULL
newdf = df.filter(df["dept"].isNull())
newdf.show()

+----+----+----+------+
|  id|name|dept|salary|
+----+----+----+------+
|null|null|null|  7500|
|   9| III|null|  4500|
+----+----+----+------+



In [None]:
# devuelve los valores que no son nulos
newdf = df.filter(df["dept"].isNotNull())
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
| 10|null|dept5|  2500|
+---+----+-----+------+



In [None]:
newdf = df.fillna("INVALID", ["dept"])
newdf.show()

+----+----+-------+------+
|  id|name|   dept|salary|
+----+----+-------+------+
|   1| AAA|  dept1|  1000|
|   2| BBB|  dept1|  1100|
|   3| CCC|  dept1|  3000|
|   4| DDD|  dept1|  1500|
|   5| EEE|  dept2|  8000|
|   6| FFF|  dept2|  7200|
|   7| GGG|  dept3|  7100|
|null|null|INVALID|  7500|
|   9| III|INVALID|  4500|
|  10|null|  dept5|  2500|
+----+----+-------+------+



In [None]:
newdf = df.dropna()
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
+---+----+-----+------+



In [None]:
newdf = df.dropna(how = "all")
newdf.show()

+----+----+-----+------+
|  id|name| dept|salary|
+----+----+-----+------+
|   1| AAA|dept1|  1000|
|   2| BBB|dept1|  1100|
|   3| CCC|dept1|  3000|
|   4| DDD|dept1|  1500|
|   5| EEE|dept2|  8000|
|   6| FFF|dept2|  7200|
|   7| GGG|dept3|  7100|
|null|null| null|  7500|
|   9| III| null|  4500|
|  10|null|dept5|  2500|
+----+----+-----+------+



In [None]:
newdf = df.dropna(subset = "dept")
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
| 10|null|dept5|  2500|
+---+----+-----+------+

