In [1]:
import findspark

In [3]:
findspark.init()

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [6]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [7]:
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [8]:
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [9]:
# All DataFrames above result same.
df.show()
df.printSchema()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [10]:
df.show(1)

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+
only showing top 1 row



In [11]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00
2,3.0,string2,2000-02-01,2000-01-02 12:00:00
3,4.0,string3,2000-03-01,2000-01-03 12:00:00


In [12]:
df.show(1, vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
only showing top 1 row



In [13]:
df.columns

['a', 'b', 'c', 'd', 'e']

In [14]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [15]:
df.select("a", "b", "c").describe().show()

+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   null|
| stddev|1.0|1.0|   null|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+



In [16]:
df.collect()

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
 Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

In [17]:
df.take(1)

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]

In [18]:
df.toPandas()

Unnamed: 0,a,b,c,d,e
0,1,2.0,string1,2000-01-01,2000-01-01 12:00:00
1,2,3.0,string2,2000-02-01,2000-01-02 12:00:00
2,3,4.0,string3,2000-03-01,2000-01-03 12:00:00


In [19]:
df.write.csv('/Users/nikhil/Desktop/self/projects for github/energy data', header=True)
spark.read.csv('/Users/nikhil/Desktop/self/projects for github/energy data', header=True).show()

AnalysisException: path file:/Users/nikhil/Desktop/self/projects for github/energy data already exists.

## practice 

# start

In [72]:
# Import required libraries
from pyspark.sql import SparkSession
import pandas as pd

# Create a SparkSession
spark = SparkSession.builder.appName("ReadExcel").getOrCreate()

# Read Excel file into a Pandas dataframe
df_pandas = pd.read_excel("/Users/nikhil/Desktop/self/projects for github/energy data/table_271.xlsx")

# Convert Pandas dataframe to Spark dataframe
df_spark = spark.createDataFrame(df_pandas)

# Show the data in Spark dataframe
df_spark

Energy Prices Domestic Prices,Unnamed: 1,Unnamed: 2,Unnamed: 3,Unnamed: 4,Unnamed: 5,Unnamed: 6,Unnamed: 7,Unnamed: 8,Unnamed: 9,Unnamed: 10,Unnamed: 11,Unnamed: 12,Unnamed: 13,Unnamed: 14,Unnamed: 15,Unnamed: 16,Unnamed: 17,Unnamed: 18,Unnamed: 19,Unnamed: 20,Unnamed: 21,Unnamed: 22,Unnamed: 23,Unnamed: 24
Transfer statisti...,,,,,,,,,,,,,,,,,,,,,,,,
Publication date:...,,,,,,,,,,,,,,,,,,,,,,,,
Data period: New ...,,,,,,,,,,,,,,,,,,,,,,,,
Next update: 28/0...,,,,,,,,,,,,,,,,,,,,,,,,
About this data,,,,,,,,,,,,,,,,,,,,,,,,
Data in these tab...,,,,,,,,,,,,,,,,,,,,,,,,
Data published in...,,,,,,,,,,,,,,,,,,,,,,,,
Further information,,,,,,,,,,,,,,,,,,,,,,,,
Quarterly Energy ...,,,,,,,,,,,,,,,,,,,,,,,,
Quarterly domesti...,,,,,,,,,,,,,,,,,,,,,,,,


In [71]:
df_spark.columns

['Energy Prices Domestic Prices',
 'Unnamed: 1',
 'Unnamed: 2',
 'Unnamed: 3',
 'Unnamed: 4',
 'Unnamed: 5',
 'Unnamed: 6',
 'Unnamed: 7',
 'Unnamed: 8',
 'Unnamed: 9',
 'Unnamed: 10',
 'Unnamed: 11',
 'Unnamed: 12',
 'Unnamed: 13',
 'Unnamed: 14',
 'Unnamed: 15',
 'Unnamed: 16',
 'Unnamed: 17',
 'Unnamed: 18',
 'Unnamed: 19',
 'Unnamed: 20',
 'Unnamed: 21',
 'Unnamed: 22',
 'Unnamed: 23',
 'Unnamed: 24']

In [67]:
# Specify the path to your CSV file
csv_file_path = "/Users/nikhil/Desktop/self/projects for github/energy data/quarterly_data_test_v2.csv"

# Read the CSV file into a DataFrame
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# Show the first few rows of the DataFrame
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Quarter: string (nullable = true)
 |-- Electricity_Transfers: integer (nullable = true)
 |-- Gas_Transfers: integer (nullable = true)
 |-- Total_Electricity_Customers: integer (nullable = true)
 |-- Total_Gas_Customers: integer (nullable = true)
 |-- Year_/_Quarter: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)



In [44]:
type(df)
df.schema.names

['Year',
 'Quarter',
 'Electricity_Transfers',
 'Gas_Transfers',
 'Total_Electricity_Customers',
 'Total_Gas_Customers',
 'Year_/_Quarter',
 '_c7',
 '_c8',
 '_c9',
 '_c10',
 '_c11']

In [45]:
columns_to_drop = ['_c7', '_c8', '_c9', '_c10', '_c11']
df = df.drop(*columns_to_drop)

# Show the modified DataFrame
df.show()

+----+-----------+---------------------+-------------+---------------------------+-------------------+----------------+
|Year|    Quarter|Electricity_Transfers|Gas_Transfers|Total_Electricity_Customers|Total_Gas_Customers|  Year_/_Quarter|
+----+-----------+---------------------+-------------+---------------------------+-------------------+----------------+
|2003|Jan to Mar |              1031000|       673000|                       null|               null|2003 Jan to Mar |
|2003|Apr to Jun |              1022000|       713000|                       null|               null|2003 Apr to Jun |
|2003|Jul to Sep |              1066000|       809000|                       null|               null|2003 Jul to Sep |
|2003|Oct to Dec |              1085000|       864000|                       null|               null|2003 Oct to Dec |
|2004| Jan to Mar|              1027000|       893000|                       null|               null| 2004 Jan to Mar|
|2004|Apr to Jun |              1008000|

In [46]:
df.schema.names

['Year',
 'Quarter',
 'Electricity_Transfers',
 'Gas_Transfers',
 'Total_Electricity_Customers',
 'Total_Gas_Customers',
 'Year_/_Quarter']

In [48]:
from pyspark.sql.functions import col, trim

In [49]:
columns_to_trim = ['Year',
 'Quarter',
 'Electricity_Transfers',
 'Gas_Transfers',
 'Total_Electricity_Customers',
 'Total_Gas_Customers',
 'Year_/_Quarter']

# Apply trim function to selected columns
for column in columns_to_trim:
    df = df.withColumn(column, trim(col(column)))

# Show the modified DataFrame
df.show()

+----+----------+---------------------+-------------+---------------------------+-------------------+---------------+
|Year|   Quarter|Electricity_Transfers|Gas_Transfers|Total_Electricity_Customers|Total_Gas_Customers| Year_/_Quarter|
+----+----------+---------------------+-------------+---------------------------+-------------------+---------------+
|2003|Jan to Mar|              1031000|       673000|                       null|               null|2003 Jan to Mar|
|2003|Apr to Jun|              1022000|       713000|                       null|               null|2003 Apr to Jun|
|2003|Jul to Sep|              1066000|       809000|                       null|               null|2003 Jul to Sep|
|2003|Oct to Dec|              1085000|       864000|                       null|               null|2003 Oct to Dec|
|2004|Jan to Mar|              1027000|       893000|                       null|               null|2004 Jan to Mar|
|2004|Apr to Jun|              1008000|       846000|   

In [50]:
print('Data overview')
df.printSchema()
print('Columns overview')
pd.DataFrame(df.dtypes, columns = ['Column Name','Data type'])

Data overview
root
 |-- Year: string (nullable = true)
 |-- Quarter: string (nullable = true)
 |-- Electricity_Transfers: string (nullable = true)
 |-- Gas_Transfers: string (nullable = true)
 |-- Total_Electricity_Customers: string (nullable = true)
 |-- Total_Gas_Customers: string (nullable = true)
 |-- Year_/_Quarter: string (nullable = true)

Columns overview


Unnamed: 0,Column Name,Data type
0,Year,string
1,Quarter,string
2,Electricity_Transfers,string
3,Gas_Transfers,string
4,Total_Electricity_Customers,string
5,Total_Gas_Customers,string
6,Year_/_Quarter,string


In [51]:
print('Data frame describe (string and numeric columns only):')
df.describe().toPandas()

print(f'There are total {df.count()} row, Let print first 2 data rows:')
df.limit(2).toPandas()

Data frame describe (string and numeric columns only):
There are total 83 row, Let print first 2 data rows:


Unnamed: 0,Year,Quarter,Electricity_Transfers,Gas_Transfers,Total_Electricity_Customers,Total_Gas_Customers,Year_/_Quarter
0,2003,Jan to Mar,1031000,673000,,,2003 Jan to Mar
1,2003,Apr to Jun,1022000,713000,,,2003 Apr to Jun


In [53]:
string_columns = ['Year',
 'Quarter',
 'Electricity_Transfers',
 'Gas_Transfers',
 'Total_Electricity_Customers',
 'Total_Gas_Customers',
 'Year_/_Quarter']
missing_values = {} 
for index, column in enumerate(df.columns):
    if column in string_columns:    # check string columns with None and Null values
#         missing_count = df.filter(col(column).eqNullSafe(None) | col(column).isNull()).count()
#         missing_values.update({column: missing_count})
        missing_count = df.filter(col(column).eqNullSafe(None) | col(column).isNull()).count()
        missing_values.update({column:missing_count})
missing_df = pd.DataFrame.from_dict([missing_values])
missing_df

Unnamed: 0,Year,Quarter,Electricity_Transfers,Gas_Transfers,Total_Electricity_Customers,Total_Gas_Customers,Year_/_Quarter
0,0,0,0,0,31,31,0


In [54]:
df.describe(['Quarter']).show()

+-------+----------+
|summary|   Quarter|
+-------+----------+
|  count|        83|
|   mean|      null|
| stddev|      null|
|    min|Apr to Jun|
|    max|Oct to Dec|
+-------+----------+



In [55]:
df.describe(['Year']).show()

+-------+-----------------+
|summary|             Year|
+-------+-----------------+
|  count|               83|
|   mean|2012.879518072289|
| stddev|6.027169360207176|
|    min|             2003|
|    max|             2023|
+-------+-----------------+



In [56]:
df.describe(['Electricity_Transfers']).show()

+-------+---------------------+
|summary|Electricity_Transfers|
+-------+---------------------+
|  count|                   83|
|   mean|   1070325.3012048192|
| stddev|   327325.00848745217|
|    min|              1008000|
|    max|               994000|
+-------+---------------------+



In [57]:
df.describe(['Gas_Transfers']).show()

+-------+-----------------+
|summary|    Gas_Transfers|
+-------+-----------------+
|  count|               83|
|   mean|817975.9036144578|
| stddev|279694.5623224933|
|    min|          1014000|
|    max|           999000|
+-------+-----------------+



In [58]:
df.describe(['Total_Electricity_Customers']).show()

+-------+---------------------------+
|summary|Total_Electricity_Customers|
+-------+---------------------------+
|  count|                         52|
|   mean|       2.8278826923076924E7|
| stddev|          594870.3836791527|
|    min|                   27447000|
|    max|                   29495000|
+-------+---------------------------+



In [59]:
df.describe(['Total_Gas_Customers']).show()

+-------+--------------------+
|summary| Total_Gas_Customers|
+-------+--------------------+
|  count|                  52|
|   mean|2.2886192307692308E7|
| stddev|    885863.288670536|
|    min|            21849000|
|    max|            24327000|
+-------+--------------------+



In [60]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
import matplotlib.pyplot as plt

In [66]:
# Train a KMeans model
kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(df['Gas_Transfers'])


TypeError: Column is not iterable

In [64]:
# # Combine features into a single vector column
# vec_assembler = VectorAssembler(inputCols=df.columns, outputCol="features")
# data = vec_assembler.transform(df)

# Train a KMeans model
kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(df)

# Make predictions
predictions = model.transform(data)

# Extract cluster centers
centers = model.clusterCenters()

# Convert Spark DataFrame to Pandas DataFrame for plotting
pandas_df = predictions.toPandas()

# Plot the data points and cluster centers
plt.scatter(pandas_df['feature1'], pandas_df['feature2'], c=pandas_df['prediction'], cmap='viridis', s=50)
plt.scatter(centers[:, 0], centers[:, 1], c='red', marker='X', s=200, label='Cluster Centers')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.title('KMeans Clustering')
plt.legend()
plt.show()

IllegalArgumentException: features does not exist. Available: Year, Quarter, Electricity_Transfers, Gas_Transfers, Total_Electricity_Customers, Total_Gas_Customers, Year_/_Quarter