In [1]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
import warnings
warnings.filterwarnings('ignore')

import os
os.environ['PYSPARK_PYTHON'] = '/opt/anaconda3/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/anaconda3/bin/python'

In [3]:
import findspark
findspark.init('/Users/subhasishbhaumik/Documents/Work/spark3')
# Initializing the spark context
import pyspark.pandas as ps
#pdf_incidents = df_incidents.to_pandas_on_spark()
from pyspark.sql import SparkSession

# Configure Spark to use multiple threads
spark = SparkSession.builder.appName("Energy Consumption")\
    .master("local[*]")\
    .config("spark.executorEnv.PYSPARK_PYTHON", "/opt/anaconda3/bin/python")\
    .config("spark.executor.instances", "4")\
    .config("spark.executor.cores", "2")\
    .config("spark.executor.memory", "2g") \
    .getOrCreate()



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/29 18:14:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, rand, explode, sequence, to_date, datediff, expr, lit, when
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType, TimestampType
import random
from datetime import datetime, timedelta
import numpy as np
from faker import Faker

# Initialize Spark session
#spark = SparkSession.builder.appName("EnergyConsumption").getOrCreate()

# import os
# os.environ['PYSPARK_PYTHON'] = '/opt/anaconda3/bin/python'
# os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/anaconda3/bin/python'

# Initialize Faker
fake = Faker()

# Constants
START_DATE = datetime(2020, 1, 1)
END_DATE = datetime(2024, 12, 31)
NUM_POWER_PLANTS = 50
NUM_TRANSMISSION_LINES = 200
NUM_SUBSTATIONS = 500
NUM_DISTRIBUTION_NETWORKS = 1000
NUM_CUSTOMERS = 1000000

# Lists of major US cities and their approximate populations
CITIES = [
    ("New York City", 8336817), ("Los Angeles", 3898747), ("Chicago", 2746388),
    ("Houston", 2304580), ("Phoenix", 1608139), ("Philadelphia", 1603797),
    ("San Antonio", 1434625), ("San Diego", 1386932), ("Dallas", 1304379),
    ("San Jose", 1013240), ("Austin", 961855), ("Jacksonville", 911507),
    ("Fort Worth", 909585), ("Columbus", 898553), ("San Francisco", 873965),
    ("Charlotte", 885708), ("Indianapolis", 876384), ("Seattle", 753675),
    ("Denver", 727211), ("Washington", 689545), ("Boston", 675647),
    ("El Paso", 681728), ("Detroit", 639111), ("Nashville", 689447),
    ("Portland", 641162), ("Memphis", 633104), ("Oklahoma City", 649021),
    ("Las Vegas", 651319), ("Louisville", 633045), ("Baltimore", 585708),
    ("Milwaukee", 577222), ("Albuquerque", 564559), ("Tucson", 548073),
    ("Fresno", 542107), ("Sacramento", 513624), ("Mesa", 504258),
    ("Kansas City", 508090), ("Atlanta", 498715), ("Long Beach", 466742),
    ("Omaha", 486051), ("Raleigh", 467665), ("Colorado Springs", 478221),
    ("Miami", 442241), ("Virginia Beach", 459470), ("Oakland", 440646),
    ("Minneapolis", 429606), ("Tulsa", 413066), ("Arlington", 398112),
    ("New Orleans", 383997), ("Wichita", 389255)
]

# Events that might affect energy consumption
EVENTS = [
    ("2020-03-15", "2020-06-30", "COVID-19 Lockdowns", -0.2),
    ("2021-02-13", "2021-02-17", "Texas Winter Storm", 0.5),
    ("2021-06-15", "2021-09-15", "Summer Heatwave", 0.3),
    ("2022-06-01", "2022-08-31", "Energy Price Spike", -0.1),
    ("2023-01-01", "2023-12-31", "Economic Recession", -0.15),
    ("2024-06-01", "2024-08-31", "Olympic Games", 0.2)
]

# UDFs
@udf(returnType=StructType([
    StructField("asset_id", IntegerType(), True),
    StructField("asset_type", StringType(), True)
]))
def generate_asset(asset_id, asset_type):
    return (asset_id, asset_type)

@udf(returnType=StructType([
    StructField("plant_id", IntegerType(), True),
    StructField("plant_name", StringType(), True),
    StructField("capacity", FloatType(), True),
    StructField("city", StringType(), True),
    StructField("asset_id", IntegerType(), True)
]))
def generate_power_plant(plant_id):
    plant_types = ["Coal", "Natural Gas", "Nuclear", "Hydroelectric", "Solar", "Wind"]
    plant_type = random.choice(plant_types)
    capacity = random.uniform(100, 2000)  # MW
    city, _ = random.choice(CITIES)
    return (plant_id, f"{city} {plant_type} Plant", capacity, city, plant_id)

@udf(returnType=StructType([
    StructField("line_id", IntegerType(), True),
    StructField("line_name", StringType(), True),
    StructField("voltage", IntegerType(), True),
    StructField("length", FloatType(), True),
    StructField("plant_id", IntegerType(), True),
    StructField("asset_id", IntegerType(), True)
]))
# def generate_transmission_line(line_id, power_plants):
#     voltages = [110, 220, 345, 500, 765]  # kV
#     plant = random.choice(power_plants)
#     return (line_id, f"Line {line_id}", random.choice(voltages), random.uniform(50, 500), plant[0], line_id)

def generate_transmission_line(line_id):
    voltages = [110, 220, 345, 500, 765]  # kV
    plant = random.choice(power_plants)
    return (line_id, f"Line {line_id}", random.choice(voltages), random.uniform(50, 500),10, line_id)

@udf(returnType=StructType([
    StructField("substation_id", IntegerType(), True),
    StructField("substation_name", StringType(), True),
    StructField("capacity", FloatType(), True),
    StructField("city", StringType(), True),
    StructField("asset_id", IntegerType(), True)
]))
def generate_substation(substation_id):
    city, _ = random.choice(CITIES)
    return (substation_id, f"{city} Substation {substation_id}", random.uniform(100, 1000), city, substation_id)

@udf(returnType=StructType([
    StructField("network_id", IntegerType(), True),
    StructField("network_name", StringType(), True),
    StructField("voltage", FloatType(), True),
    StructField("substation_id", IntegerType(), True),
    StructField("asset_id", IntegerType(), True)
]))


# def generate_distribution_network(network_id, substations):
#     substation = random.choice(substations)
#     return (network_id, f"Network {network_id}", 11.0, substation[0], network_id)

def generate_distribution_network(network_id):
    substation = random.choice(substations)
    return (network_id, f"Network {network_id}", 11.0, substation[0], network_id)

@udf(returnType=StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("customer_name", StringType(), True),
    StructField("address", StringType(), True),
    StructField("network_id", IntegerType(), True)
]))


def generate_customer(customer_id):
    network = random.choice(networks)
    city = next(sub for sub in substations if sub[0] == network[3])[3]
    #print(city)
    addr= random.choice(adrs.filter(adrs.city==city).select([lit(fake.street_address()).alias('street'),'zip','lat','lng','state_name',lit('US').alias('country')]).collect())                    
    #print(addr)
    return(customer_id, fake.name(),f"{addr[0]} ,[{addr[2]},{addr[3]}],{addr[4]},{addr[1]}, {addr[5]}",network[0])



@udf(returnType=StructType([
    StructField("meter_id", IntegerType(), True),
    StructField("meter_type", StringType(), True),
    StructField("installation_date", DateType(), True),
    StructField("customer_id", IntegerType(), True)
]))
def generate_meter(meter_id, customer_id):
    meter_types = ["Smart", "Analog", "Digital"]
    return (meter_id, random.choice(meter_types), fake.date_between(start_date=START_DATE, end_date=END_DATE), customer_id)

@udf(returnType=StructType([
    StructField("consumption_id", IntegerType(), True),
    StructField("meter_id", IntegerType(), True),
    StructField("date", DateType(), True),
    StructField("consumption", FloatType(), True)
]))
def generate_consumption(consumption_id, meter_id, date, base_consumption):
    # Strong seasonal variation
    month = date.month
    day_of_year = date.timetuple().tm_yday
    # print(month)
    # print(day_of_year)
    
    # Summer peak (July) and winter peak (January) with smoother transitions
    seasonal_factor = 1 + 0.5 * (np.sin((day_of_year - 15) * 2 * np.pi / 365) + 
                                 0.5 * np.sin((day_of_year - 15) * 4 * np.pi / 365))
    
    # Temperature variation (approximate, you may want to use actual temperature data for more accuracy)
    temp_variation = random.gauss(0, 0.1)  # Random normal distribution
    seasonal_factor += temp_variation
    
    # Weekly pattern (higher consumption on weekdays)
    weekday_factor = 1.1 if date.weekday() < 5 else 0.9
    
    # Apply random daily variation
    daily_variation = random.uniform(0.9, 1.1)
    
    # Apply event effects
    event_effect = 1
    for event_start, event_end, _, effect in EVENTS:
        if datetime.strptime(event_start, "%Y-%m-%d").date() <= date <= datetime.strptime(event_end, "%Y-%m-%d").date():
            event_effect += effect

    
    # Calculate final consumption
    consumption = base_consumption * seasonal_factor * weekday_factor * daily_variation * event_effect
    return (consumption_id, meter_id, date, max(0, consumption))

@udf(returnType=StructType([
    StructField("bill_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("date", DateType(), True),
    StructField("amount", FloatType(), True),
    StructField("consumption_id", IntegerType(), True)
]))
def generate_billing(bill_id, customer_id, consumption_id, consumption, date):
    rate = random.uniform(0.1, 0.2)  # $/kWh
    amount = consumption * rate
    return (bill_id, customer_id, date, amount, consumption_id)

@udf(returnType=StructType([
    StructField("outage_id", IntegerType(), True),
    StructField("start_time", TimestampType(), True),
    StructField("end_time", TimestampType(), True),
    StructField("description", StringType(), True),
    StructField("asset_id", IntegerType(), True)
]))
def generate_outage(outage_id):
    asset = random.choice(assets)
    start_time = fake.date_time_between(start_date=START_DATE, end_date=END_DATE)
    end_time = start_time + timedelta(hours=random.randint(1, 24))
    return (outage_id, start_time, end_time, f"Outage on {asset[1]}", asset[0])


In [7]:
# Generate data
assets_df = spark.range(1, NUM_POWER_PLANTS + NUM_TRANSMISSION_LINES + NUM_SUBSTATIONS + NUM_DISTRIBUTION_NETWORKS + 1) \
    .withColumn("asset_type", when(col("id") <= NUM_POWER_PLANTS, "Power Plant")
                .when(col("id") <= NUM_POWER_PLANTS + NUM_TRANSMISSION_LINES, "Transmission Line")
                .when(col("id") <= NUM_POWER_PLANTS + NUM_TRANSMISSION_LINES + NUM_SUBSTATIONS, "Substation")
                .otherwise("Distribution Network")) \
    .withColumn("asset", generate_asset("id", "asset_type")) \
    .select("asset.*")


In [9]:
power_plants_df = spark.range(1, NUM_POWER_PLANTS + 1) \
    .withColumn("plant", generate_power_plant("id")) \
    .select("plant.*")
power_plants=power_plants_df.collect()
power_plants_df.createOrReplaceTempView("power_plants_df")
## Transmission Lines
transmission_lines_df = spark.range(NUM_POWER_PLANTS + 1, NUM_POWER_PLANTS + NUM_TRANSMISSION_LINES + 1) \
    .withColumn("line", generate_transmission_line("id")) \
    .select("line.*")
transmission_lines_df.createOrReplaceTempView("transmission_lines_df")
substations_df = spark.range(NUM_POWER_PLANTS + NUM_TRANSMISSION_LINES + 1, NUM_POWER_PLANTS + NUM_TRANSMISSION_LINES + NUM_SUBSTATIONS + 1) \
    .withColumn("substation", generate_substation("id")) \
    .select("substation.*")
substations_df.createOrReplaceTempView("substations_df")

                                                                                

In [11]:
substations=substations_df.collect()
distribution_networks_df = spark.range(NUM_POWER_PLANTS + NUM_TRANSMISSION_LINES + NUM_SUBSTATIONS + 1, NUM_POWER_PLANTS + NUM_TRANSMISSION_LINES + NUM_SUBSTATIONS + NUM_DISTRIBUTION_NETWORKS + 1) \
    .withColumn("network", generate_distribution_network("id")) \
    .select("network.*")
distribution_networks_df.createOrReplaceTempView("distribution_networks_df")
#distribution_networks_df.show()

In [107]:
nsql="""
select d.network_id , s.city from
distribution_networks_df d 
inner join substations_df s
on s.substation_id=d.substation_id
"""
network_city = spark.sql(nsql).collect()

In [109]:
import pandas as pd

# Read the CSV file into a pandas DataFrame
address = pd.read_csv('/Users/subhasishbhaumik/Documents/neu/IE6750/project_data/uszips.csv')
@udf(returnType=StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("customer_name", StringType(), True),
    StructField("address", StringType(), True),
    StructField("network_id", IntegerType(), True)
]))
def generate_customer(customer_id):
    network = random.choice(network_city)
    #print(addr)
    # Filter the DataFrame based on a column value
    filtered_df = address[address['city'] == network[1]]
    addr=random.choice(filtered_df[['city','zip','lat','lng','state_name']].values.tolist())
    addr_col=f"{fake.street_address()} ,{addr[0]},{addr[1]},[{addr[2]},{addr[3]}],{addr[4]},US"
    # Print the filtered DataFrame
    addr_col
    return (customer_id, fake.name(),addr_col,network[0])


In [111]:
networks=distribution_networks_df.collect()
customers_df = spark.range(1, NUM_CUSTOMERS + 1) \
    .withColumn("customer", generate_customer("id")) \
    .select("customer.*")
customers_df.show(truncate=False)

+-----------+------------------+--------------------------------------------------------------------------------------+----------+
|customer_id|customer_name     |address                                                                               |network_id|
+-----------+------------------+--------------------------------------------------------------------------------------+----------+
|1          |Marie Perry       |781 Mclean Walk Suite 318 ,Las Vegas,89102,[36.1453,-115.18662],Nevada,US             |857       |
|2          |Carrie Dean       |402 Hernandez Views Apt. 838 ,Oklahoma City,73169,[35.3815,-97.64538],Oklahoma,US     |1370      |
|3          |Gina Hammond      |390 Good Road Suite 912 ,Washington,20018,[38.92596,-76.97281],District of Columbia,US|1060      |
|4          |Rodney Anderson   |73438 Lisa Centers ,Philadelphia,19147,[39.93645,-75.15468],Pennsylvania,US           |1708      |
|5          |Allison Madden    |689 Turner Common ,Boston,2133,[42.35838,-71.06383]

In [113]:
from pyspark.sql.functions import *
#customers_df_n = customers_df.filter(length(col(address))>=0)

customers_df.createOrReplaceTempView("customers_df")
sql_cust="""
select * from customers_df where 
length(address) >=0
"""
spark.sql(sql_cust).count()

24/09/29 18:41:56 ERROR Executor: Exception in task 4.0 in stage 59.0 (TID 264)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/var/folders/9b/j4zsp5q576v916jdht_wj2s40000gn/T/ipykernel_60010/3665416466.py", line 16, in generate_customer
  File "/opt/anaconda3/lib/python3.12/random.py", line 347, in choice
    raise IndexError('Cannot choose from an empty sequence')
IndexError: Cannot choose from an empty sequence

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(It

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/var/folders/9b/j4zsp5q576v916jdht_wj2s40000gn/T/ipykernel_60010/3665416466.py", line 16, in generate_customer
  File "/opt/anaconda3/lib/python3.12/random.py", line 347, in choice
    raise IndexError('Cannot choose from an empty sequence')
IndexError: Cannot choose from an empty sequence


24/09/29 18:41:56 WARN TaskSetManager: Lost task 7.0 in stage 59.0 (TID 267) (subonmacbookpro executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 4 in stage 59.0 failed 1 times, most recent failure: Lost task 4.0 in stage 59.0 (TID 264) (subonmacbookpro executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/var/folders/9b/j4zsp5q576v916jdht_wj2s40000gn/T/ipykernel_60010/3665416466.py", line 16, in generate_customer
  File "/opt/anaconda3/lib/python3.12/random.py", line 347, in choice
    raise IndexError('Cannot choose from an empty sequence')
IndexError: Cannot choose from an empty sequence

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	a

In [25]:
customers_df.count()

1000000

In [27]:
meters_df = spark.range(1, NUM_CUSTOMERS + 1) \
    .withColumn("meter", generate_meter("id", "id")) \
    .select("meter.*")

In [None]:
meters_df.count()

In [None]:
@udf(returnType=StructType([
    StructField("consumption_id", IntegerType(), True),
    StructField("meter_id", IntegerType(), True),
    StructField("date", DateType(), True),
    StructField("consumption", FloatType(), True)
]))
def generate_consumption(consumption_id, meter_id, date, base_consumption):
    # Strong seasonal variation
    month = date.month
    day_of_year = date.timetuple().tm_yday
    # print(month)
    # print(day_of_year)
    
    # Summer peak (July) and winter peak (January) with smoother transitions
    seasonal_factor = 1 + 0.5 * (np.sin((day_of_year - 15) * 2 * np.pi / 365) + 
                                 0.5 * np.sin((day_of_year - 15) * 4 * np.pi / 365))
    
    # Temperature variation (approximate, you may want to use actual temperature data for more accuracy)
    temp_variation = random.gauss(0, 0.1)  # Random normal distribution
    seasonal_factor += temp_variation
    
    # Weekly pattern (higher consumption on weekdays)
    weekday_factor = 1.1 if date.weekday() < 5 else 0.9
    
    # Apply random daily variation
    daily_variation = random.uniform(0.9, 1.1)
    
    # Apply event effects
    event_effect = 1
    for event_start, event_end, _, effect in EVENTS:
        if datetime.strptime(event_start, "%Y-%m-%d").date() <= date <= datetime.strptime(event_end, "%Y-%m-%d").date():
            event_effect += effect

    
    # Calculate final consumption
    consumption = base_consumption * seasonal_factor * weekday_factor * daily_variation * event_effect
    return (consumption_id, meter_id, date, max(0, consumption))

In [31]:
import decimal
def generate_consumption(consumption_id, meter_id, date, base_consumption):
    # Strong seasonal variation
    #date=datetime.strptime(date1, '%Y-%m-%d')
    month = date.month
    day_of_year = date.timetuple().tm_yday
    # print(month)
    # print(day_of_year)
    
    # Summer peak (July) and winter peak (January) with smoother transitions
    seasonal_factor = 1 + 0.5 * (np.sin((day_of_year - 15) * 2 * np.pi / 365) + 
                                 0.5 * np.sin((day_of_year - 15) * 4 * np.pi / 365))
    
    # Temperature variation (approximate, you may want to use actual temperature data for more accuracy)
    temp_variation = random.gauss(0, 0.1)  # Random normal distribution
    seasonal_factor += temp_variation
    
    # Weekly pattern (higher consumption on weekdays)
    weekday_factor = 1.1 if date.weekday() < 5 else 0.9
    
    # Apply random daily variation
    daily_variation = random.uniform(0.9, 1.1)
    
    # Apply event effects
    event_effect = 1
    for event_start, event_end, _, effect in EVENTS:
        # if datetime.strptime(event_start, "%Y-%m-%d").date() <= date <= datetime.strptime(event_end, "%Y-%m-%d").date():
        #     event_effect += effect
        if datetime.strptime(event_start, "%Y-%m-%d").date() <= date <= datetime.strptime(event_end, "%Y-%m-%d").date():
            event_effect += effect

    
    # Calculate final consumption
    consumption = base_consumption * seasonal_factor * weekday_factor * daily_variation * event_effect
    return (consumption_id, meter_id, date, decimal.Decimal(round(max(0, consumption),2)))

In [None]:
generate_consumption(1,1,'2020-01-01',200)

In [None]:
# Generate consumption and billing data
date_range = spark.sql(f"SELECT explode(sequence(to_date('{START_DATE}'), to_date('{END_DATE}'), interval 1 day)) as date")
from pyspark.sql.functions import monotonically_increasing_id

consumption_df = meters_df.crossJoin(date_range) \
    .withColumn("base_consumption", rand() * 800 + 200) \
    .withColumn("id",monotonically_increasing_id())\
    .withColumn("consumption", generate_consumption(
        "id",
        "meter_id",
        "date",
        col("base_consumption")/30
    )) \
    .select("consumption.*")



In [35]:
date_range = spark.sql(f"SELECT explode(sequence(to_date('{START_DATE}'), to_date('{END_DATE}'), interval 1 day)) as date")
from pyspark.sql.functions import monotonically_increasing_id
consumption_df = meters_df.crossJoin(date_range) \
    .withColumn("base_consumption", rand() * 800 + 200) \
    .withColumn("id",monotonically_increasing_id())\
    .withColumn("con_inp",col("base_consumption")/30)
consumption_df.show()

[Stage 15:>                                                         (0 + 1) / 1]

+--------+----------+-----------------+-----------+----------+------------------+---+------------------+
|meter_id|meter_type|installation_date|customer_id|      date|  base_consumption| id|           con_inp|
+--------+----------+-----------------+-----------+----------+------------------+---+------------------+
|       1|     Smart|       2024-11-03|          1|2020-01-01|355.69333693815247|  0|11.856444564605082|
|       1|     Smart|       2024-11-03|          1|2020-01-02| 959.9835281652955|  1|31.999450938843186|
|       1|     Smart|       2024-11-03|          1|2020-01-03| 811.0975685634106|  2|27.036585618780354|
|       1|     Smart|       2024-11-03|          1|2020-01-04|246.17747043499287|  3| 8.205915681166429|
|       1|     Smart|       2024-11-03|          1|2020-01-05| 688.3761904070266|  4|22.945873013567553|
|       1|     Smart|       2024-11-03|          1|2020-01-06| 912.5896506989349|  5|30.419655023297828|
|       1|     Smart|       2024-11-03|          1|2020

                                                                                

In [39]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType,FloatType,DecimalType

schema = StructType([
    StructField("consumption_id", IntegerType(), True),
    StructField("meter_id", IntegerType(), True),
    StructField("consumption_date", DateType(), True),
    StructField("consumption", DecimalType(), True)
])
_rdd=consumption_df.limit(1000000).rdd
abc=_rdd.map(lambda row: generate_consumption(row[6],row[0],row[4],row[7]))
consumption_df_1=spark.createDataFrame(abc,schema)
consumption_df_1.count()

                                                                                

1000000

In [45]:
consumption_df_1.printSchema()

root
 |-- consumption_id: integer (nullable = true)
 |-- meter_id: integer (nullable = true)
 |-- consumption_date: date (nullable = true)
 |-- consumption: decimal(10,0) (nullable = true)



In [49]:
billing_df = consumption_df_1 \
    .where(expr("day(consumption_date) = 1")) \
    .groupBy("meter_id", "consumption_date") \
    .agg({"consumption": "sum"}) \
    .withColumnRenamed("sum(consumption)", "total_consumption") \
    .withColumn("billing", generate_billing(
        monotonically_increasing_id(),
        "meter_id",
        monotonically_increasing_id(),
        "total_consumption",
        "consumption_date"
    )) \
    .select("billing.*")


In [51]:
billing_df.count()

                                                                                

32841

In [55]:
assets=assets_df.collect()
outages_df = spark.range(1, 1001) \
    .withColumn("outage", generate_outage("id")) \
    .select("outage.*")

# Save DataFrames to CSV files
def save_to_csv(df, filename):
    df.write.csv(filename, header=True, mode="overwrite")

In [59]:

save_to_csv(assets_df, '/Users/subhasishbhaumik/Documents/neu/IE6750/project_data/ps/assets')


                                                                                

In [61]:
save_to_csv(power_plants_df, '/Users/subhasishbhaumik/Documents/neu/IE6750/project_data/ps/power_plants')


In [63]:
save_to_csv(transmission_lines_df, '/Users/subhasishbhaumik/Documents/neu/IE6750/project_data/ps/transmission_lines')


In [65]:
save_to_csv(substations_df, '/Users/subhasishbhaumik/Documents/neu/IE6750/project_data/ps/substations')


In [67]:
save_to_csv(distribution_networks_df, '/Users/subhasishbhaumik/Documents/neu/IE6750/project_data/ps/distribution_networks')


In [69]:
save_to_csv(customers_df, '/Users/subhasishbhaumik/Documents/neu/IE6750/project_data/ps/customers')


24/09/29 18:30:58 ERROR Executor: Exception in task 5.0 in stage 45.0 (TID 202)]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/var/folders/9b/j4zsp5q576v916jdht_wj2s40000gn/T/ipykernel_60010/3665416466.py", line 16, in generate_customer
  File "/opt/anaconda3/lib/python3.12/random.py", line 347, in choice
    raise IndexError('Cannot choose from an empty sequence')
IndexError: Cannot choose from an empty sequence

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(I

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/var/folders/9b/j4zsp5q576v916jdht_wj2s40000gn/T/ipykernel_60010/3665416466.py", line 16, in generate_customer
  File "/opt/anaconda3/lib/python3.12/random.py", line 347, in choice
    raise IndexError('Cannot choose from an empty sequence')
IndexError: Cannot choose from an empty sequence


In [None]:
save_to_csv(meters_df, '/Users/subhasishbhaumik/Documents/neu/IE6750/project_data/ps/meters')
save_to_csv(consumption_df, '/Users/subhasishbhaumik/Documents/neu/IE6750/project_data/ps/consumption')
save_to_csv(billing_df, '/Users/subhasishbhaumik/Documents/neu/IE6750/project_data/ps/billing')
save_to_csv(outages_df, '/Users/subhasishbhaumik/Documents/neu/IE6750/project_data/ps/outages')

print("Data generation complete. CSV files have been created.")



In [None]:
spark.stop()

## Appendix

In [None]:
def generate_fake_address(city_name):
    address = {
        "street_address": fake.street_address(),
        "city": city_name,
        "state": fake.state(),
        "postal_code": fake.postcode(),
        "country": fake.country()
    }
    return address

def generate_customer(customer_id):
    network = random.choice(networks)
    city = next(sub for sub in substations if sub[0] == network[3])[3]
    print(city)
    addr= random.choice(adrs.filter(adrs.city==city).select([lit(fake.street_address()).alias('street'),'zip','lat','lng','state_name',lit('US').alias('country')]).collect())                    
    print(addr)
    return(customer_id, fake.name(),f"{addr[0]} ,[{addr[2]},{addr[3]}],{addr[4]},{addr[1]}, {addr[5]}",network[0])
    #return (customer_id, fake.name(), generate_fake_address(city).replace('\n', ', ') , network[0])

In [None]:
generate_customer(1)

In [None]:
def generate_fake_address(city_name):
    Faker.seed(100)
    return {
        "street_address": fake.street_address(),
        "city": city_name,
        "state": fake.state(),
        "postal_code": fake.zipcode(),
        "country": fake.country()
    }

In [None]:
path = "/Users/subhasishbhaumik/Documents/neu/IE6750/project_data/uszips.csv"
adrs = spark.read.csv(path, header=True, inferSchema=True)

In [None]:
adrs.filter(adrs.city=='Miami').select([lit(fake.street_address()),'city','zip','lat','lng','state_name',lit('US')]).collect()

In [None]:
df.printSchema()

In [None]:
random.choices(CITIES, weights=[city[1] for city in CITIES])[0]

In [None]:
import pandas as pd

# Read the CSV file into a pandas DataFrame
address = pd.read_csv('/Users/subhasishbhaumik/Documents/neu/IE6750/project_data/uszips.csv')

# Filter the DataFrame based on a column value
filtered_df = address[address['city'] == 'Miami']
addr=random.choice(filtered_df[['city','zip','lat','lng','state_name']].values.tolist())
addr_col=f"{fake.street_address()} ,{addr[0]},{addr[1]},[{addr[2]},{addr[3]}],{addr[4]},US"
# Print the filtered DataFrame
addr_col

In [None]:
addr=random.choice(filtered_df[['city','zip','lat','lng','state_name']].values.tolist())

In [None]:
f"{fake.street_address()} ,{addr[0]},{addr[1]},[{addr[2]},{addr[3]}],{addr[4]},US"

In [None]:
addr

In [None]:
spark.stop()