# Chapter 1

### Spark dataframe

```
# Create dataframe from RDD
spark_df = spark.createDataFrame(RDD, schema=colname_list)
# Loading file (folder name will make the spark load all files in that folder in parallel mode)
df = spark.read.csv("file.csv", header=True, inferSchema=True) # .json, .txt, .load for parquet
df.show(3) # Show first 3 rows
df.collect() # Store result as list of tuples
df.limit(3) # Same as show
df.dtypes # See datatype of each column
df.printSchema() # See schema information
result.columns # See result table columns
df = df.na.drop(subset=["col_name"]) # Drop nulls
df = df.drop(subset=["col_name"]) # Drop column
df = df.dropDuplicates() # Drop duplicates
df = df.withColumn("col_name", col("col_name").cast("float"))  # Way 1 : Casting a column to another data type
df = df.withColumn("col_name", df.col_name.cast("float")) # Way 2 : Casting a column to another data type
df.describe().show() # Summary stats
df = df.repartition(4, 'some_col') # create 4 partitions using same column values of specified column
print(df.rdd.getNumPartitions()) # See no of partitions of the dataset

df = df.select(df.col1, df.col2, df.col3) # way1 : select column from dataframe
df = df.select("col1", "col2") # way2 : select column from dataframe
df.select(col('col1'), col('col2')) # way3 : select column from dataframe,  import col from sql.functions
df = df.withColumn("new_col",df.old_col+10) # Add a new result column
df = df.withColumnRenamed("old_col_name", "new_col_name") # Rename column
df = df.select(col('col1').alias('col1_renamed'), 'col2')
df = df.selectExpr("col1", "col2", "col3", "col1/(col2/60) as another_col")
df = df.withColumn("idx", monotonically_increasing_id()) # Creating id column
df.where(array_contains('col', 'abc')) # Check if an element is inside an array
df1 = df1.withColumn("source", lit("df1")) # Adding constants in a column

df_vertical = df1.union(df2) # Vertical join (append rows vertically)
df_horizontal = df1.join(df1, on=['common_col1', 'common_col2'], how="left") (append columns horizontally with join)
df_cross = df1.crossJoin(df2) # Cross Join (Horizontally appending columns of possible combinations)

# Filtering (Both produces same results)
df = df.filter("col_name > 120").show()
df = df.where("Value > 120")
df = df.filter(df.col_name > 120).show()
df = df.where(df.Value > 120)
filterA = df.col1 == "SEA"
result = temp.filter(filterA).filter(filterB) # Chaining filters
df.groupBy("col_name").count() # Group by and count
df.orderBy("col_name") # order by 
df.filter(df.col == 'value').groupBy().max("another_col") # Multiple chaining aggregation

df.createOrReplaceTempView("table_name") # Register DataFrame as a temporary talbe in catalog
spark.catalog.listTables() # See all table information in the catalog
spark.catalog.dropTempView('table_name') # Remove temp table from catalog
spark_df = spark.table("table_name") # start using a spark table as spark dataframe
result = spark.sql("SELECT * FROM table_name") # Run query on table

# Using Custom function to double the value of a column
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
def double_val(col):
    return col * 2 # Make sure any new data is casted to proper type
double_val_udf = udf(double_val, IntegerType()) # Register UDF with custom function and return type
df = df.withColumn("DoubledCol", double_val_udf(df["col"]))

## Visualization : Pyspark_dist_explore, pandas (NOT RECOMMENDED), HandySpark(RECOMMENDED)
pandas_df = spark_df.toPandas()
handy_df = spark_df.toHandy() # Convert to handyspark dataframe
handy_df.cols["col_name"].hist()
spark_df = handy_df.to_spark() # Convert to pyspark dataframe

## NOTE
# Array: [1.0, 0.0, 0.0, 3.0]
# Sparse vector: (4, [0, 3], [1.0, 3.0])
```

### Spark SQL

```
df = spark.read.csv("filename.csv", header=True)
df.createOrReplaceTempView("table_name")
result = spark.sql("SELECT * FROM table_name") # simple query, result saved as dataframe
result.show()
result = spark.sql("DESCRIBE tablename") # See table information

# Window functions
query = """
SELECT *,
ROW_NUMBER() OVER(PARTITION BY train_id ORDER BY time) AS id
FROM schedule
"""
spark.sql(query)

# equivalent dot notation
window = Window.partitionBy('train_id').orderBy('time')
dfx = df.withColumn('id', row_number().over(window))

# CASE.. WHEN
query = """
SELECT id,
    CASE
        WHEN id < 25000 THEN 'Preface'
        WHEN id < 50000 THEN 'Chapter 1'
        WHEN id < 75000 THEN 'Chapter 2'
        ELSE 'Chapter 3'
    END AS title
FROM df
"""
spark.sql(query)

# equivalent dot notation
df2 = df.withColumn('title', when(df.id < 25000, 'Preface')
.when(df.id < 50000, 'Chapter 1')
.when(df.id < 75000, 'Chapter 2')
.otherwise('Chapter 3'))

```

# Chapter 2

### Spark regex

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, regexp_replace

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Regex Example") \
    .getOrCreate()

# Sample data
data = [("John", "Contact me at (123) 456-7890 or (456) 789 0123"),
        ("Alice", "No phone number in this text"),
        ("Bob", "Call me at (555) 555-5555 or (555) 123-4567")]

# Create DataFrame
df = spark.createDataFrame(data, ["name", "text"])
# Pattern Matching: Check if each string contains a phone number in the specified format
df.createOrReplaceTempView("df")
df = df.withColumn("has_phone_number", df["text"].rlike(r'\(\d{3}\) \d{3}[- ]\d{4}'))

# Group Matching: Extract area code, exchange code, and subscriber number
phone_pattern = r'\((\d{3})\) (\d{3})[- ](\d{4})'
df = df.withColumn("area_code", regexp_extract("text", phone_pattern, 1))
df = df.withColumn("exchange_code", regexp_extract("text", phone_pattern, 2))
df = df.withColumn("subscriber_number", regexp_extract("text", phone_pattern, 3))

# Replace: Replace phone numbers with "PHONE_NUMBER_REDACTED"
df = df.withColumn("redacted_text", regexp_replace("text", phone_pattern, "PHONE_NUMBER_REDACTED"))

# Show DataFrame
# df.show() # truncate=False

# Stop SparkSession
spark.stop()


### Pyspark string

```
# lowercase the strings in a column
df = df.select(lower(col('col_name'))) 
# replace string or characters
df = df1.select(regexp_replace('col_name', 'old', 'new').alias('new_col'))
# Split a string on space
df = df.select(split('string_col', '[ ]').alias('word_list'))
# Split string using any given symbol
punctuation = "_|.\?\!\",\'\[\]\*()"
df = df.select(split('string_col', '[ %s]' % punctuation).alias('word_list'))
# Filter out empty strings from the resulting list
df = df.filter(col('word_list') != '')
# Explode the string list column so that each row contains one value of list
df = df.select(explode('word_list').alias('word'))

```

# Chapter 3

### Caching, Logging

- caching is lazy operation
- Only cache when necessary (multiple Operation requires the dataframe or table)
- Least Recently Used (LRU) as eviction policy
- Eviction happens independently on each worker
- uncache when it is no longer needed

```
# Caching dataframe
df.persist(storageLevel=pyspark.StorageLevel.MEMORY_AND_DISK) # cache a fataframe
df.cache() # Alternative : cache a fataframe
df.is_cached # Check if the dataframe is cached
df.storageLevel # How the dataframe is cached (useDisk, useMemory, useOffHeap, deserialized, replication)
df.unpersist() # uncache a fataframe
# Caching table
df.createOrReplaceTempView('df') # Register dataframe as table
spark.catalog.cacheTable('df') # Cache the table
spark.catalog.isCached(tableName='df') # Check if the table is cached
spark.catalog.uncacheTable('df') # Uncache table
spark.catalog.clearCache() # Clear all cache

# Visualize cache operations, query plans at >>>>>> localhost:4040

# Logging for inspecting
import logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # DEBUG, 
logging.info("Hello %s", "world") # INFO level print
logging.debug("Hello, take %d", 2) # DEBUG level print
ENABLED = False
t = timer()
logger.info("No action here.")
elapsed1 = t - timer()
if ENABLED: # Managing processor usage leakage
    logger.info("df has %d rows.", df.count())
elapsed2 = t - timer()

# Log columns of text_df as debug message
logging.deug("text_df columns: %s", df1.columns)
# Log whether table1 is cached as info message
logging.info("table1 is cached: %s", spark.catalog.isCached(tableName="table1"))
# Log first row of text_df as warning message
logging.warning("The first row of text_df:\n %s", df1.first())
# Log selected columns of text_df as error message
logging.error("Selected columns: %s", df1.select("id", "word"))
logging.disable(logging.DEBUG) # Turn off logging

# Query plan analysis
df.cache()
df.explain() # Analysis on datafrmae (use bottom-up approach)
spark.sql('EXPLAIN SELECT * FROM df').first() # Analysis using sql
```

### Logging in python

```
import logging
import sys

logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s', 
                              '%m-%d-%Y %H:%M:%S')

stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.DEBUG)
stdout_handler.setFormatter(formatter)

file_handler = logging.FileHandler('logs.log')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)

logger.addHandler(file_handler)
logger.addHandler(stdout_handler)

########### Simple way #############
import logging

logging.basicConfig(filename='test.log', format='%(filename)s: %(message)s',
                    level=logging.DEBUG)

logging.debug('This is a debug message')
logging.info('This is an info message')
logging.warning('This is a warning message')
logging.error('This is an error message')
logging.critical('This is a critical message')


```

# Chapter 4

### Sparse data

```
from pyspark.ml.linalg import DenseVector, SparseVector, Vectors
dense_vector = DenseVector([1.0, 0.0, 3.0, 0.0, 5.0]) # [1.0,0.0,3.0,0.0,5.0]
sparse_vector = SparseVector(5, [0, 2, 4], [1.0, 3.0, 5.0]) # (size, index, values) representation. Others are 0s

from pyspark.sql.functions import udf
# Given you have "spark" session and "dense_array" column
########### DENSE TO SPARSE #############
def dense_to_sparse(dense_array):
    return Vectors.dense(dense_array).toSparse()

from pyspark.sql.types import ArrayType, DoubleType
dense_to_sparse_udf = udf(dense_to_sparse, ArrayType(DoubleType()))
sparse_df = dense_df.withColumn("sparse_array", dense_to_sparse_udf("dense_array"))

########### SPARSE TO DENSE  #############
def sparse_to_dense(sparse_array):
    return Vectors.sparse(len(sparse_array), [i for i, v in enumerate(sparse_array) if v != 0], 
                          [v for v in sparse_array if v != 0]).toArray()

from pyspark.sql.types import ArrayType, DoubleType
sparse_to_dense_udf = udf(sparse_to_dense, ArrayType(DoubleType()))
dense_df = sparse_df.withColumn("dense_array", sparse_to_dense_udf("sparse_array"))

############## FOR TEXT DATA ###################

from pyspark.ml.feature import CountVectorizer, VectorAssembler # Efficient sparse representation for text
vectorizer = CountVectorizer(inputCol="text_col", outputCol="features") 
vectorizer_model = vectorizer.fit(text_df)
sparse_df = vectorizer_model.transform(text_df)

# Convert sparse vectors to dense vectors
vector_assembler = VectorAssembler(inputCols=["features"], outputCol="dense_features")
dense_df = vector_assembler.transform(sparse_df).select("dense_features")

# Convert sparse vector column to array
def sparse_to_array(sparse_vector):
    return sparse_vector.toArray().tolist()

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType
sparse_to_array_udf = udf(sparse_to_array, ArrayType(DoubleType()))
array_df = sparse_df.withColumn("array_features", sparse_to_array_udf("features"))

### Checking if sparse array is empty
def is_sparse_array_empty(sparse_array):
    return sparse_array.numNonzeros() == 0

is_sparse_array_empty_udf = udf(is_sparse_array_empty, BooleanType())
df_with_boolean = df.withColumn("is_empty", is_sparse_array_empty_udf("sparse_col"))

### Using machine learning
train_data, test_data = df.randomSplit([0.7, 0.3], seed=123)
lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_data)
predictions = lr_model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions) # lr_model.evaluate(test_data)
```