# Spark Dataframes and Operations Code

## Create Dataframe Operations

In [0]:
from pyspark.sql import SparkSession

# Crear la sesión de Spark
spark = SparkSession.builder \
    .appName("MiNotebookConSpark") \
    .getOrCreate()  # Esto automáticamente usa el modo local dentro del contenedor

In [0]:
import pandas as pd
from datetime import datetime, date
from pyspark.sql import Row

data_df = spark.createDataFrame([
    Row(col_1=100, col_2=200., col_3='string_test_1', col_4=date(2023, 1, 1), col_5=datetime(2023, 1, 1, 12, 0)),
    Row(col_1=200, col_2=300., col_3='string_test_2', col_4=date(2023, 2, 1), col_5=datetime(2023, 1, 2, 12, 0)),
    Row(col_1=400, col_2=500., col_3='string_test_3', col_4=date(2023, 3, 1), col_5=datetime(2023, 1, 3, 12, 0))
])


In [0]:
import pandas as pd
from datetime import datetime, date
from pyspark.sql import Row

data_df = spark.createDataFrame([
    Row(col_1=100, col_2=200., col_3='string_test_1', col_4=date(2023, 1, 1), col_5=datetime(2023, 1, 1, 12, 0)),
    Row(col_1=200, col_2=300., col_3='string_test_2', col_4=date(2023, 2, 1), col_5=datetime(2023, 1, 2, 12, 0)),
    Row(col_1=400, col_2=500., col_3='string_test_3', col_4=date(2023, 3, 1), col_5=datetime(2023, 1, 3, 12, 0))
], schema=' col_1 long, col_2 double, col_3 string, col_4 date, col_5 timestamp')


In [0]:
import pandas as pd
from datetime import datetime, date
from pyspark.sql import Row

pandas_df = pd.DataFrame({
    'col_1': [100, 200, 400],
    'col_2': [200., 300., 500.],
    'col_3': ['string_test_1', 'string_test_2', 'string_test_3'],
    'col_4': [date(2023, 1, 1), date(2023, 2, 1), date(2023, 3, 1)],
    'col_5': [datetime(2023, 1, 1, 12, 0), datetime(2023, 1, 2, 12, 0), datetime(2023, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)


In [0]:
from datetime import datetime, date
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Crear directamente un DataFrame sin usar sparkContext
data = [
    (100, 200., 'string_test_1', date(2023, 1, 1), datetime(2023, 1, 1, 12, 0)),
    (200, 300., 'string_test_2', date(2023, 2, 1), datetime(2023, 1, 2, 12, 0)),
    (300, 400., 'string_test_3', date(2023, 3, 1), datetime(2023, 1, 3, 12, 0))
]

# Crear DataFrame directamente desde la lista de datos
data_df = spark.createDataFrame(data, schema=['col_1', 'col_2', 'col_3', 'col_4', 'col_5'])

# Mostrar el DataFrame
data_df.show()

+-----+-----+-------------+----------+-------------------+
|col_1|col_2|        col_3|     col_4|              col_5|
+-----+-----+-------------+----------+-------------------+
|  100|200.0|string_test_1|2023-01-01|2023-01-01 12:00:00|
|  200|300.0|string_test_2|2023-02-01|2023-01-02 12:00:00|
|  300|400.0|string_test_3|2023-03-01|2023-01-03 12:00:00|
+-----+-----+-------------+----------+-------------------+



In [0]:
from datetime import datetime, date
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

rdd = spark.sparkContext.parallelize([
    (100, 200., 'string_test_1', date(2023, 1, 1), datetime(2023, 1, 1, 12, 0)),
    (200, 300., 'string_test_2', date(2023, 2, 1), datetime(2023, 1, 2, 12, 0)),
    (300, 400., 'string_test_3', date(2023, 3, 1), datetime(2023, 1, 3, 12, 0))
])
data_df = spark.createDataFrame(rdd, schema=['col_1', 'col_2', 'col_3', 'col_4', 'col_5'])

## How to View the Dataframes

In [0]:
data_df.show()

+-----+-----+-------------+----------+-------------------+
|col_1|col_2|        col_3|     col_4|              col_5|
+-----+-----+-------------+----------+-------------------+
|  100|200.0|string_test_1|2023-01-01|2023-01-01 12:00:00|
|  200|300.0|string_test_2|2023-02-01|2023-01-02 12:00:00|
|  300|400.0|string_test_3|2023-03-01|2023-01-03 12:00:00|
+-----+-----+-------------+----------+-------------------+



In [0]:
data_df.show(2)

+-----+-----+-------------+----------+-------------------+
|col_1|col_2|        col_3|     col_4|              col_5|
+-----+-----+-------------+----------+-------------------+
|  100|200.0|string_test_1|2023-01-01|2023-01-01 12:00:00|
|  200|300.0|string_test_2|2023-02-01|2023-01-02 12:00:00|
+-----+-----+-------------+----------+-------------------+
only showing top 2 rows


In [0]:
data_df.printSchema()

root
 |-- col_1: long (nullable = true)
 |-- col_2: double (nullable = true)
 |-- col_3: string (nullable = true)
 |-- col_4: date (nullable = true)
 |-- col_5: timestamp (nullable = true)



In [0]:
data_df.show(1, vertical=True)

-RECORD 0--------------------
 col_1 | 100                 
 col_2 | 200.0               
 col_3 | string_test_1       
 col_4 | 2023-01-01          
 col_5 | 2023-01-01 12:00:00 
only showing top 1 row


In [0]:
data_df.columns

['col_1', 'col_2', 'col_3', 'col_4', 'col_5']

In [0]:
data_df.count()

3

In [0]:
data_df.select('col_1', 'col_2', 'col_3').describe().show()

+-------+-----+-----+-------------+
|summary|col_1|col_2|        col_3|
+-------+-----+-----+-------------+
|  count|    3|    3|            3|
|   mean|200.0|300.0|         NULL|
| stddev|100.0|100.0|         NULL|
|    min|  100|200.0|string_test_1|
|    max|  300|400.0|string_test_3|
+-------+-----+-----+-------------+



## Collecting the data

In [0]:
data_df.collect()

[Row(col_1=100, col_2=200.0, col_3='string_test_1', col_4=datetime.date(2023, 1, 1), col_5=datetime.datetime(2023, 1, 1, 12, 0)),
 Row(col_1=200, col_2=300.0, col_3='string_test_2', col_4=datetime.date(2023, 2, 1), col_5=datetime.datetime(2023, 1, 2, 12, 0)),
 Row(col_1=300, col_2=400.0, col_3='string_test_3', col_4=datetime.date(2023, 3, 1), col_5=datetime.datetime(2023, 1, 3, 12, 0))]

In [0]:
data_df.take(1)

[Row(col_1=100, col_2=200.0, col_3='string_test_1', col_4=datetime.date(2023, 1, 1), col_5=datetime.datetime(2023, 1, 1, 12, 0))]

In [0]:
data_df.tail(1)

[Row(col_1=300, col_2=400.0, col_3='string_test_3', col_4=datetime.date(2023, 3, 1), col_5=datetime.datetime(2023, 1, 3, 12, 0))]

In [0]:
data_df.head(1)

[Row(col_1=100, col_2=200.0, col_3='string_test_1', col_4=datetime.date(2023, 1, 1), col_5=datetime.datetime(2023, 1, 1, 12, 0))]

## Converting a PySpark DataFrame to a Pandas DataFrame

In [0]:
data_df.toPandas()

Unnamed: 0,col_1,col_2,col_3,col_4,col_5
0,100,200.0,string_test_1,2023-01-01,2023-01-01 12:00:00
1,200,300.0,string_test_2,2023-02-01,2023-01-02 12:00:00
2,300,400.0,string_test_3,2023-03-01,2023-01-03 12:00:00


## How to do Data Manipulation - Rows and Columns

In [0]:
from pyspark.sql import Column

data_df.select(data_df.col_3).show()


+-------------+
|        col_3|
+-------------+
|string_test_1|
|string_test_2|
|string_test_3|
+-------------+



In [0]:
from pyspark.sql import functions as F
data_df = data_df.withColumn("col_6", F.lit("A"))
data_df.show()


+-----+-----+-------------+----------+-------------------+-----+
|col_1|col_2|        col_3|     col_4|              col_5|col_6|
+-----+-----+-------------+----------+-------------------+-----+
|  100|200.0|string_test_1|2023-01-01|2023-01-01 12:00:00|    A|
|  200|300.0|string_test_2|2023-02-01|2023-01-02 12:00:00|    A|
|  300|400.0|string_test_3|2023-03-01|2023-01-03 12:00:00|    A|
+-----+-----+-------------+----------+-------------------+-----+



In [0]:
data_df = data_df.drop("col_5")
data_df.show()


+-----+-----+-------------+----------+-----+
|col_1|col_2|        col_3|     col_4|col_6|
+-----+-----+-------------+----------+-----+
|  100|200.0|string_test_1|2023-01-01|    A|
|  200|300.0|string_test_2|2023-02-01|    A|
|  300|400.0|string_test_3|2023-03-01|    A|
+-----+-----+-------------+----------+-----+



In [0]:
data_df.withColumn("col_2", F.col("col_2") / 100).show()

+-----+-----+-------------+----------+-----+
|col_1|col_2|        col_3|     col_4|col_6|
+-----+-----+-------------+----------+-----+
|  100|  2.0|string_test_1|2023-01-01|    A|
|  200|  3.0|string_test_2|2023-02-01|    A|
|  300|  4.0|string_test_3|2023-03-01|    A|
+-----+-----+-------------+----------+-----+



In [0]:
data_df = data_df.withColumnRenamed("col_3", "string_col")
data_df.show()


+-----+-----+-------------+----------+-----+
|col_1|col_2|   string_col|     col_4|col_6|
+-----+-----+-------------+----------+-----+
|  100|200.0|string_test_1|2023-01-01|    A|
|  200|300.0|string_test_2|2023-02-01|    A|
|  300|400.0|string_test_3|2023-03-01|    A|
+-----+-----+-------------+----------+-----+



In [0]:
data_df.select("col_6").distinct().show()

+-----+
|col_6|
+-----+
|    A|
+-----+



In [0]:
data_df.select(F.countDistinct("col_6").alias("Total_Unique")).show()

+------------+
|Total_Unique|
+------------+
|           1|
+------------+



In [0]:
from pyspark.sql.functions import upper

data_df.withColumn('upper_string_col', upper(data_df.string_col)).show()


+-----+-----+-------------+----------+-----+----------------+
|col_1|col_2|   string_col|     col_4|col_6|upper_string_col|
+-----+-----+-------------+----------+-----+----------------+
|  100|200.0|string_test_1|2023-01-01|    A|   STRING_TEST_1|
|  200|300.0|string_test_2|2023-02-01|    A|   STRING_TEST_2|
|  300|400.0|string_test_3|2023-03-01|    A|   STRING_TEST_3|
+-----+-----+-------------+----------+-----+----------------+



In [0]:
data_df.filter(data_df.col_1 == 100).show()

+-----+-----+-------------+----------+-----+
|col_1|col_2|   string_col|     col_4|col_6|
+-----+-----+-------------+----------+-----+
|  100|200.0|string_test_1|2023-01-01|    A|
+-----+-----+-------------+----------+-----+



In [0]:
data_df.filter((data_df.col_1 == 100)
		& (data_df.col_6 == 'A')).show()


+-----+-----+-------------+----------+-----+
|col_1|col_2|   string_col|     col_4|col_6|
+-----+-----+-------------+----------+-----+
|  100|200.0|string_test_1|2023-01-01|    A|
+-----+-----+-------------+----------+-----+



In [0]:
data_df.filter((data_df.col_1 == 100)
		| (data_df.col_2 == 300.00)).show()


+-----+-----+-------------+----------+-----+
|col_1|col_2|   string_col|     col_4|col_6|
+-----+-----+-------------+----------+-----+
|  100|200.0|string_test_1|2023-01-01|    A|
|  200|300.0|string_test_2|2023-02-01|    A|
+-----+-----+-------------+----------+-----+



In [0]:
list = [100, 200]
data_df.filter(data_df.col_1.isin(list)).show()


+-----+-----+-------------+----------+-----+
|col_1|col_2|   string_col|     col_4|col_6|
+-----+-----+-------------+----------+-----+
|  100|200.0|string_test_1|2023-01-01|    A|
|  200|300.0|string_test_2|2023-02-01|    A|
+-----+-----+-------------+----------+-----+



In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,BooleanType,DateType,IntegerType

data_df_2 = data_df.withColumn("col_4",col("col_4").cast(StringType())) \
    .withColumn("col_1",col("col_1").cast(IntegerType()))
data_df_2.printSchema()
data_df.show()



root
 |-- col_1: integer (nullable = true)
 |-- col_2: double (nullable = true)
 |-- string_col: string (nullable = true)
 |-- col_4: string (nullable = true)
 |-- col_6: string (nullable = false)

+-----+-----+-------------+----------+-----+
|col_1|col_2|   string_col|     col_4|col_6|
+-----+-----+-------------+----------+-----+
|  100|200.0|string_test_1|2023-01-01|    A|
|  200|300.0|string_test_2|2023-02-01|    A|
|  300|400.0|string_test_3|2023-03-01|    A|
+-----+-----+-------------+----------+-----+



In [0]:
data_df_3 = data_df_2.selectExpr("cast(col_4 as date) col_4",
    "cast(col_1 as long) col_1")
data_df_3.printSchema()


root
 |-- col_4: date (nullable = true)
 |-- col_1: long (nullable = true)



In [0]:
data_df_3.createOrReplaceTempView("CastExample")
data_df_4 = spark.sql("SELECT DOUBLE(col_1), DATE(col_4) from CastExample")
data_df_4.printSchema()
data_df_4.show(truncate=False)


root
 |-- col_1: double (nullable = true)
 |-- col_4: date (nullable = true)

+-----+----------+
|col_1|col_4     |
+-----+----------+
|100.0|2023-01-01|
|200.0|2023-02-01|
|300.0|2023-03-01|
+-----+----------+



In [0]:
salary_data = [("John", "Field-eng", 3500), 
    ("Michael", "Field-eng", 4500), 
    ("Robert", None, 4000), 
    ("Maria", "Finance", 3500), 
    ("John", "Sales", 3000), 
    ("Kelly", "Finance", 3500), 
    ("Kate", "Finance", 3000), 
    ("Martin", None, 3500), 
    ("Kiran", "Sales", 2200), 
    ("Michael", "Field-eng", 4500) 
  ]
columns= ["Employee", "Department", "Salary"]
salary_data = spark.createDataFrame(data = salary_data, schema = columns)
salary_data.printSchema()
salary_data.show()


root
 |-- Employee: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: long (nullable = true)

+--------+----------+------+
|Employee|Department|Salary|
+--------+----------+------+
|    John| Field-eng|  3500|
| Michael| Field-eng|  4500|
|  Robert|      NULL|  4000|
|   Maria|   Finance|  3500|
|    John|     Sales|  3000|
|   Kelly|   Finance|  3500|
|    Kate|   Finance|  3000|
|  Martin|      NULL|  3500|
|   Kiran|     Sales|  2200|
| Michael| Field-eng|  4500|
+--------+----------+------+



In [0]:
salary_data.dropna().show()

+--------+----------+------+
|Employee|Department|Salary|
+--------+----------+------+
|    John| Field-eng|  3500|
| Michael| Field-eng|  4500|
|   Maria|   Finance|  3500|
|    John|     Sales|  3000|
|   Kelly|   Finance|  3500|
|    Kate|   Finance|  3000|
|   Kiran|     Sales|  2200|
| Michael| Field-eng|  4500|
+--------+----------+------+



In [0]:
new_salary_data = salary_data.dropDuplicates().show()

+--------+----------+------+
|Employee|Department|Salary|
+--------+----------+------+
|    John| Field-eng|  3500|
| Michael| Field-eng|  4500|
|  Robert|      NULL|  4000|
|    John|     Sales|  3000|
|   Maria|   Finance|  3500|
|   Kelly|   Finance|  3500|
|    Kate|   Finance|  3000|
|  Martin|      NULL|  3500|
|   Kiran|     Sales|  2200|
+--------+----------+------+



Using Aggregrates in a Dataframe

In [0]:
from pyspark.sql.functions import countDistinct, avg
salary_data.select(avg('Salary')).show()


+-----------+
|avg(Salary)|
+-----------+
|     3520.0|
+-----------+



In [0]:
salary_data.agg({'Salary':'count'}).show()

+-------------+
|count(Salary)|
+-------------+
|           10|
+-------------+



In [0]:
salary_data.select(countDistinct("Salary").alias("Distinct Salary")).show()

+---------------+
|Distinct Salary|
+---------------+
|              5|
+---------------+



In [0]:
salary_data.agg({'Salary':'max'}).show() 

+-----------+
|max(Salary)|
+-----------+
|       4500|
+-----------+



In [0]:
salary_data.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|      35200|
+-----------+



In [0]:
salary_data.orderBy("Salary").show()

+--------+----------+------+
|Employee|Department|Salary|
+--------+----------+------+
|   Kiran|     Sales|  2200|
|    John|     Sales|  3000|
|    Kate|   Finance|  3000|
|    John| Field-eng|  3500|
|   Maria|   Finance|  3500|
|   Kelly|   Finance|  3500|
|  Martin|      NULL|  3500|
|  Robert|      NULL|  4000|
| Michael| Field-eng|  4500|
| Michael| Field-eng|  4500|
+--------+----------+------+



In [0]:
salary_data.orderBy(salary_data["Salary"].desc()).show()

+--------+----------+------+
|Employee|Department|Salary|
+--------+----------+------+
| Michael| Field-eng|  4500|
| Michael| Field-eng|  4500|
|  Robert|      NULL|  4000|
|    John| Field-eng|  3500|
|   Maria|   Finance|  3500|
|   Kelly|   Finance|  3500|
|  Martin|      NULL|  3500|
|    John|     Sales|  3000|
|    Kate|   Finance|  3000|
|   Kiran|     Sales|  2200|
+--------+----------+------+

