In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spillExample').getOrCreate()

In [12]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

spark.sparkContext.setJobDescription('spill')  # Setting Job description
data = range(0, 10000000)
# Create DataFrame with a lot of records
df = spark.createDataFrame(data, "int").toDF("id")
df.cache().count()

# Create a temporary view so we can run SQL queries
df.createOrReplaceTempView("records")

# Force a spill. The high number of partitions and usage of `UNION ALL`
# will generate a big number of records in memory
df2 = spark.sql("""
    SELECT /*+ REPARTITION(1000) */ id 
    FROM records 
    UNION ALL 
    SELECT /*+ REPARTITION(1000) */ id 
    FROM records
""")
df2.cache().count()

24/06/05 17:39:15 WARN TaskSetManager: Stage 0 contains a task of very large size (6737 KiB). The maximum recommended task size is 1000 KiB.
24/06/05 17:39:20 WARN TaskSetManager: Stage 1 contains a task of very large size (6737 KiB). The maximum recommended task size is 1000 KiB.
24/06/05 17:39:22 WARN TaskSetManager: Stage 4 contains a task of very large size (6737 KiB). The maximum recommended task size is 1000 KiB.
24/06/05 17:39:27 WARN TaskSetManager: Stage 5 contains a task of very large size (6737 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

20000000

In [45]:
spark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col
import random

# Spark session oluşturuluyor
spark.sparkContext.setJobDescription('skew')  # Setting Job description
sc = spark.sparkContext

# Büyük bir RDD yaratılıyor
rdd = sc.parallelize([(i, random.randint(0,1000000)) for i in range(10000000)])

# Key'ler bazen aynı olacak şekilde RDD'ye bozuk veri ekleniyor
rdd = rdd.union(sc.parallelize([(0, i) for i in range(1000000)]))

# Reduce operasyonunda shuffle ve skewed data oluştuğundan emin olunur
result_rdd = rdd.reduceByKey(lambda a, b: a + b)

result_rdd.collect()

In [261]:
spark.sparkContext.setJobDescription('scan tiny files')  # Setting Job description

weather = spark.read.parquet("/Users/ugurkalkavan/Downloads/m06sparkbasics/weather")
weather.filter(weather["year"]==2022).count()

450787193

In [487]:
spark.sparkContext.setJobDescription('scan tiny files 2')  # Setting Job description

df_str = spark.read.parquet("/Users/ugurkalkavan/tmp/df_str")
df_str.count()

100000000

In [61]:
import requests
import json
import time

# Spark UIs API endpoint, 'local' is Spark's default master URL, replace 'local' with your Spark master
SPARK_API_ENDPOINT = "http://localhost:4041/api/v1/applications"

def fetch_spark_apps(spark):
    try:
        
        app_id = spark.sparkContext.applicationId
        return app_id

    except Exception as e:
        print(f"Error in fetching Spark Apps: {str(e)}")

In [None]:
def sizeof_fmt(num, suffix="B"):
    for unit in ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"):
        if abs(num) < 1024.0:
            return f"{num:3.1f} {unit}{suffix}"
        num /= 1024.0
    return f"{num:.1f}Yi{suffix}"

In [217]:
def convert_size(size_string, suffix="B"):
    units = {"": 0, "Ki": 1, "Mi": 2, "Gi": 3, "Ti": 4, "Pi": 5, "Ei": 6, "Zi": 7, "Yi": 8}
    
    # Boşlukla ayrılmış son iki değeri al
    size, unit = size_string.split()[-2:]
    size = float(size)

    # İlgili çarpanı bulmak için sözlükten ilgili uniti al
    power = units[unit.replace(suffix, '')]

    # İlgili çarpan kadar 1024 ile çarp
    return size * (1024 ** power)

In [65]:
def custom_scaler(data):
    data_min = min(data)
    data_range = max(data) - data_min
    scaled_data = [(item-data_min)/data_range for item in data]
    return scaled_data

In [67]:
import numpy as np

def detect_anomalies(data, m=1.4):
    data_min = min(data)
    data_range = max(data) - data_min
    if data_range == 0:  # if all data points are the same
        return False
    else:
        scaled_data = [(item - data_min) / data_range for item in data]

        # Detect outliers
        mean = np.mean(scaled_data)
        std_dev = np.std(scaled_data)

        return not any((abs(mean - value) > m * std_dev) for value in scaled_data)

In [532]:
def get_scan_parquet_metrics(app_id):
    metrics_dict = {}
    try:
        response = requests.get(f"{SPARK_API_ENDPOINT}/{app_id}/sql")
        sql = json.loads(response.text)
        print(response.text)
        for item in sql:
            sql_id = item['id']
            metrics_dict[sql_id] = {}
            for node in item['nodes']:
                if node['nodeName'] == 'Scan parquet':
                    node_metrics = {}
                    for metric in node['metrics']:
                        node_metrics[metric['name']] = metric['value']
                    metrics_dict[sql_id] = node_metrics    
        return metrics_dict
    except Exception as e:
        print(f"Error in fetching metrics: {str(e)}")

In [534]:
def detect_tiny_files(metrics_dict: dict):
    for sql_id, metrics in metrics_dict.items():
        print(metrics)
        
        print(f"Processing SQL_ID: {sql_id}")
        
        files_read = int(metrics['number of files read'].replace(',', '')) if isinstance(metrics['number of files read'], str) else metrics['number of files read']
       
        # 'number of partitions read' metriği yoksa, değeri 1 olarak ayarla
        partitions_read = 1
        if 'number of partitions read' in metrics:
            partitions_read = int(metrics['number of partitions read'].replace(',', '')) if isinstance(metrics['number of partitions read'], str) else metrics['number of partitions read']


        size_read = convert_size(metrics['size of files read'])
        
        average_number_of_file_by_partition = files_read / partitions_read
        average_file_size = size_read / files_read
        
        # Metriklerin değerlerini kontrol et
        print("Number of files read:            ", metrics['number of files read'])
        print("Scan time:                       ", metrics['scan time'])
        if 'Dynamic partition pruning time' in metrics:
            print("Dynamic partition pruning time:  ", metrics['dynamic partition pruning time'])
        print("Metadata time:                   ", metrics['metadata time'])
        print("Size of files read:              ", metrics['size of files read'])
        print("Number of output rows:           ", metrics['number of output rows'])
        if 'Number of partitions read' in metrics:
            print("Dynamic partition pruning time:  ", metrics['number of partitions read'])
        
        print(f"Average number of files by partition: {average_number_of_file_by_partition}")
        print(f"Average file size: {sizeof_fmt(average_file_size)}")
        
        if average_number_of_file_by_partition > 1 and size_read/partitions_read < 134217728 :
            print("Tiny file PROBLEM!!!")

        print()
        

In [536]:
metrics = get_scan_parquet_metrics(app_id)

[ {
  "id" : 0,
  "status" : "COMPLETED",
  "description" : "scan tiny files",
  "planDescription" : "== Physical Plan ==\nAdaptiveSparkPlan (12)\n+- == Final Plan ==\n   * HashAggregate (7)\n   +- ShuffleQueryStage (6), Statistics(sizeInBytes=7.7 KiB, rowCount=494)\n      +- Exchange (5)\n         +- * HashAggregate (4)\n            +- * Project (3)\n               +- * ColumnarToRow (2)\n                  +- Scan parquet  (1)\n+- == Initial Plan ==\n   HashAggregate (11)\n   +- Exchange (10)\n      +- HashAggregate (9)\n         +- Project (8)\n            +- Scan parquet  (1)\n\n\n(1) Scan parquet \nOutput [3]: [year#5, month#6, day#7]\nBatched: true\nLocation: InMemoryFileIndex [file:/Users/ugurkalkavan/Downloads/m06sparkbasics/weather]\nReadSchema: struct<>\n\n(2) ColumnarToRow [codegen id : 1]\nInput [3]: [year#5, month#6, day#7]\n\n(3) Project [codegen id : 1]\nOutput: []\nInput [3]: [year#5, month#6, day#7]\n\n(4) HashAggregate [codegen id : 1]\nInput: []\nKeys: []\nFunctions [

In [527]:
detect_tiny_files(metrics)

{'number of files read': '8,796', 'scan time': 'total (min, med, max (stageId: taskId))\n39.9 s (6 ms, 51 ms, 655 ms (stage 1.0: task 3))', 'dynamic partition pruning time': '0 ms', 'metadata time': '38 ms', 'size of files read': '27.7 GiB', 'number of output rows': '3,604,627,124', 'number of partitions read': '2,932'}
Processing SQL_ID: 0
Number of files read:             8,796
Scan time:                        total (min, med, max (stageId: taskId))
39.9 s (6 ms, 51 ms, 655 ms (stage 1.0: task 3))
Metadata time:                    38 ms
Size of files read:               27.7 GiB
Number of output rows:            3,604,627,124
Average number of files by partition: 3.0
Average file size: 3.2 MiB
Tiny file PROBLEM!!!

{'number of files read': '8,796', 'scan time': 'total (min, med, max (stageId: taskId))\n17.3 s (1 ms, 25 ms, 387 ms (stage 5.0: task 501))', 'dynamic partition pruning time': '0 ms', 'metadata time': '11 ms', 'size of files read': '27.7 GiB', 'number of output rows': '3,

In [277]:
def sql_details(app_id):
    try:
        response = requests.get(f"{SPARK_API_ENDPOINT}/{app_id}/sql")
        sql = json.loads(response.text)
        #print(response.text)
        #print(sql['nodes'])
        metrics_dict = get_scan_parquet_metrics(app_id)
        return metrics_dict
    except Exception as e:
        print(e)
        print(f"Error in fetching Job and Stage details: {str(e)}")

In [279]:
sql_details(app_id)

{'number of files read': '1,101',
 'scan time': 'total (min, med, max (stageId: taskId))\n1.8 s (8 ms, 28 ms, 58 ms (stage 9.0: task 999))',
 'dynamic partition pruning time': '0 ms',
 'metadata time': '24 ms',
 'size of files read': '3.5 GiB',
 'number of output rows': '450,787,193',
 'number of partitions read': '367'}

In [81]:
def job_and_stage_details(app_id):
    try:
            response = requests.get(f"{SPARK_API_ENDPOINT}/{app_id}/jobs")
            jobs = json.loads(response.text)
            
            #print(response.text)
            
            response = requests.get(f"{SPARK_API_ENDPOINT}/{app_id}/stages/?withSummaries=true&quantiles=0.0,0.25,0.5,0.75,1.0")
            stages = json.loads(response.text)
            #print(response.text)
            

            
            
         
    
            #print(stages)
            #for stage in stages:
            for job in jobs:
                job_id = job["jobId"]
                job_name = job["name"]
                job_stageIds = job["stageIds"]
                
               


                for s_id in job_stageIds:
                    for stage in stages:
                        if stage['stageId'] == s_id:
                            if stage['status'] == 'SKIPPED':
                                continue
                            
                            
                            #print(stage["taskMetricsDistributions"])

                            # Fetch metrics from the stage dictionary
                            input_metrics = stage['taskMetricsDistributions']['inputMetrics']
                            output_metrics = stage['taskMetricsDistributions']['outputMetrics']
                            shuffle_read_metrics = stage['taskMetricsDistributions']['shuffleReadMetrics']
                            shuffle_write_metrics = stage['taskMetricsDistributions']['shuffleWriteMetrics']

                            # Assign variables
                            bytes_read = input_metrics['bytesRead']
                            records_read = input_metrics['recordsRead']
                            bytes_written = output_metrics['bytesWritten']
                            records_written = output_metrics['recordsWritten']
                            
                            # Shuffle read metrikleri
                            shuffle_read_bytes = shuffle_read_metrics["readBytes"]
                            shuffle_read_records = shuffle_read_metrics["readRecords"]

                            # Shuffle write metrikleri
                            shuffle_write_bytes = shuffle_write_metrics["writeBytes"]
                            shuffle_write_records = shuffle_write_metrics["writeRecords"]
                            shuffle_write_time = shuffle_write_metrics["writeTime"]


                            
                            print(f"Input Bytes Read: {bytes_read}")
                            print(f"Input Records Read: {records_read}")
                            print(f"Output Bytes Written: {bytes_written}")
                            print(f"Output Records Written: {records_written}")
                            # Değişkenlerin yazdırılması
                            print(f"Shuffle Read Bytes: {shuffle_read_bytes}")
                            print(f"Shuffle Read Records: {shuffle_read_records}")
                            print(f"Shuffle Write Bytes: {shuffle_write_bytes}")
                            print(f"Shuffle Write Records: {shuffle_write_records}")
                            print(f"Shuffle Write Time: {shuffle_write_time}")
                            
                            if detect_anomalies(shuffle_read_bytes) or detect_anomalies(shuffle_write_bytes):
                                print("#### There might be skew!!! ###")
                                
                            disk_spill = stage['diskBytesSpilled']
                            memory_spill = stage['memoryBytesSpilled']

                            if disk_spill > 0 or memory_spill > 0:

                                            print()
                                            print(f"Job {job_id} - Job Name {job_name} - Stage {stage['stageId']} - Disk spilled {sizeof_fmt(disk_spill)} bytes.")
                                            print(f"Job {job_id} - Job Name {job_name} - Stage {stage['stageId']} - Memory spilled {sizeof_fmt(memory_spill)} bytes.")
                            print()
                            print("-------------------")
                            print()

    except Exception as e:
        print(e)
        print(f"Error in fetching Job and Stage details: {str(e)}")

In [83]:
app_id = fetch_spark_apps(spark)
job_and_stage_details(app_id)

[ {
  "id" : 0,
  "status" : "COMPLETED",
  "description" : "scan tiny files",
  "planDescription" : "== Physical Plan ==\nAdaptiveSparkPlan (12)\n+- == Final Plan ==\n   * HashAggregate (7)\n   +- ShuffleQueryStage (6), Statistics(sizeInBytes=7.7 KiB, rowCount=494)\n      +- Exchange (5)\n         +- * HashAggregate (4)\n            +- * Project (3)\n               +- * ColumnarToRow (2)\n                  +- Scan parquet  (1)\n+- == Initial Plan ==\n   HashAggregate (11)\n   +- Exchange (10)\n      +- HashAggregate (9)\n         +- Project (8)\n            +- Scan parquet  (1)\n\n\n(1) Scan parquet \nOutput [3]: [year#5, month#6, day#7]\nBatched: true\nLocation: InMemoryFileIndex [file:/Users/ugurkalkavan/Downloads/m06sparkbasics/weather]\nReadSchema: struct<>\n\n(2) ColumnarToRow [codegen id : 1]\nInput [3]: [year#5, month#6, day#7]\n\n(3) Project [codegen id : 1]\nOutput: []\nInput [3]: [year#5, month#6, day#7]\n\n(4) HashAggregate [codegen id : 1]\nInput: []\nKeys: []\nFunctions [

In [73]:
shuffle_write_records = [1.0, 1.0, 400.0, 400.0, 400.0]
anomalies = detect_anomalies(shuffle_write_records)

print("Anomalies:", anomalies)

Anomalies: True


In [15]:
# 10 haneli sayılardan oluşan örnek bir liste
data = [4009375.0, 5461460.0, 90484129.0, 157407576.0, 173781832.0] #TRUE
data2 = [190378691.0, 191422987.0, 192053189.0, 199053189.0, 199953189.0] #FALSE
data3 = [5150254.0, 5151575.0, 5154254.0, 5155274.0, 5314861.0] #FALSE
data4 = [58.0, 58.0, 58.0, 58.0, 59.0] #FALSE
data5 = [4009375.0, 5461460.0, 16484129.0, 157407576.0, 173781832.0] #TRUE
data6 = [1.0, 1.0, 1.0, 1.0, 1.0] #FALSE
data6 = [0.0, 0.0, 0.0, 1.0, 1.0] #FALSE

In [9]:
print(detect_anomalies(data))
print(detect_anomalies(data2))
print(detect_anomalies(data3))
print(detect_anomalies(data4))
print(detect_anomalies(data5))
print(detect_anomalies(data6))

NameError: name 'detect_anomalies' is not defined

In [37]:
sizeof_fmt(4009375.0)


'3.8 MiB'

In [39]:
sizeof_fmt(90484129.0)

'86.3 MiB'

In [43]:
sizeof_fmt(173781832.0)

'165.7 MiB'

In [21]:
import numpy as np

In [25]:
np.mean(data)

86228874.4

In [27]:
np.median(data)

90484129.0

In [41]:
np.median(data) - np.mean(data)

4255254.599999994

In [1463]:
import math


def predict_num_partitions(files):
    ## What is the maximum size of each spark-partition (default value)?
    defaultMaxPartitionBytes = int(spark.conf.get("spark.sql.files.maxPartitionBytes").replace("b",""))

    ## What is the cost in bytes for each file (default value)?
    open_cost_bytes = int(spark.conf.get("spark.sql.files.openCostInBytes").replace("b",""))
    
    max_partition_bytes = int(spark.conf.get("spark.sql.files.maxPartitionBytes").replace("b",""))

    
    #actual_bytes = sum([file.size for file in files])               # Total size of the dataset on disk 
    actual_bytes = sum([os.path.getsize(file_path) for file_path in files])
    padded_bytes = actual_bytes + (len(files) * open_cost_bytes)          # Final size with padding from openCost

    bytes_per_core = (padded_bytes/spark.sparkContext.defaultParallelism)           # The number of bytes per core
    max_of_cost_BPC = max(open_cost_bytes, bytes_per_core)                # Larger of openCost and bytesPerCore
    target_size = min(max_partition_bytes , max_of_cost_BPC)        # Smaller of maxPartitionBytes and maxOfCostBPC
    partitions = padded_bytes /  float(target_size)                 # The final number of partitions (needs to be rounded up)



    print("defaultMaxPartitionBytes:", defaultMaxPartitionBytes)
    print("")
    print("---")
    print("File Count:", len(files))
    print("Actual Bytes:", actual_bytes)
    print("Padded Bytes:", padded_bytes, "Actual_Bytes + (File_Count * Open_Cost)")
    print("Average Size:", (padded_bytes/len(files)))
    print("---")
    print("Open Cost:", open_cost_bytes, "spark.sql.files.openCostInBytes")
    print("Bytes-Per-Core:", bytes_per_core, "padded_bytes / Default Parallelism")
    print("Max Cost:", max_of_cost_BPC, "(max of Open_Cost & Bytes-Per-Core)")
    print("---")
    print("Max Partition Bytes:", max_partition_bytes, "spark.sql.files.maxPartitionBytes")
    print("Target Size:", target_size, "(min of Max_Cost & Max_Partition_Bytes)")
    print("---")
    print("Number of Partions:", math.ceil(partitions), f"({partitions} from Padded_Bytes / Target_Size)")


In [None]:
def predict_num_files(df: Dataframe) -> int:
    ## What is the maximum size of each spark-partition (default value)?
    defaultMaxPartitionBytes = int(spark.conf.get("spark.sql.files.maxPartitionBytes").replace("b",""))

    ## What is the cost in bytes for each file (default value)?
    open_cost_bytes = int(spark.conf.get("spark.sql.files.openCostInBytes").replace("b",""))
    
    max_partition_bytes = int(spark.conf.get("spark.sql.files.maxPartitionBytes").replace("b",""))

    
    #actual_bytes = sum([file.size for file in files])               # Total size of the dataset on disk 
    actual_bytes = sum([os.path.getsize(file_path) for file_path in files])
    padded_bytes = actual_bytes + (len(files) * open_cost_bytes)          # Final size with padding from openCost

    bytes_per_core = (padded_bytes/spark.sparkContext.defaultParallelism)           # The number of bytes per core
    max_of_cost_BPC = max(open_cost_bytes, bytes_per_core)                # Larger of openCost and bytesPerCore
    target_size = min(max_partition_bytes , max_of_cost_BPC)        # Smaller of maxPartitionBytes and maxOfCostBPC
    partitions = padded_bytes /  float(target_size)                 # The final number of partitions (needs to be rounded up)



    print("defaultMaxPartitionBytes:", defaultMaxPartitionBytes)
    print("")
    print("---")
    print("File Count:", len(files))
    print("Actual Bytes:", actual_bytes)
    print("Padded Bytes:", padded_bytes, "Actual_Bytes + (File_Count * Open_Cost)")
    print("Average Size:", (padded_bytes/len(files)))
    print("---")
    print("Open Cost:", open_cost_bytes, "spark.sql.files.openCostInBytes")
    print("Bytes-Per-Core:", bytes_per_core, "padded_bytes / Default Parallelism")
    print("Max Cost:", max_of_cost_BPC, "(max of Open_Cost & Bytes-Per-Core)")
    print("---")
    print("Max Partition Bytes:", max_partition_bytes, "spark.sql.files.maxPartitionBytes")
    print("Target Size:", target_size, "(min of Max_Cost & Max_Partition_Bytes)")
    print("---")
    print("Number of Partions:", math.ceil(partitions), f"({partitions} from Padded_Bytes / Target_Size)")
