# ATM Big Data Project using PySpark (RDD + DataFrame + SQL)

This notebook demonstrates a complete Big Data pipeline using:

- PySpark RDD API  
- DataFrame transformations  
- Spark SQL  
- Fraud detection  
- Temperature alerts  
- Cash summary  
- CSV output (GitHub-friendly)  

Dataset size: 50,000 ATM transactions.


In [None]:
!pip install pyspark




## Initializing PySpark

We set up Java 11 and Spark 3.4.2, then create a SparkSession.


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ATM_PySpark_Project") \
    .master("local[*]") \
    .getOrCreate()

spark


## Generate Synthetic ATM Dataset

We generate 50,000 ATM transaction logs with:
- temperature
- number of ₹500 notes
- number of ₹200 notes
- transaction amount
- timestamp


In [None]:
import csv, random, time, datetime

OUTPUT = "atm_data.csv"
NUM_ROWS = 20000  # reduce if your runtime is slow

with open(OUTPUT, "w", newline="", encoding="utf-8") as f:
    writer = csv.writer(f)
    writer.writerow(["atm_id","temp","cash_500","cash_200","transaction_amount","ts"])

    atm_ids = [f"ATM{str(i).zfill(3)}" for i in range(1, 21)]
    start_time = int(time.time()) - NUM_ROWS

    for i in range(NUM_ROWS):
        atm = random.choice(atm_ids)
        temp = random.randint(20, 80)
        c500 = random.randint(0, 300)
        c200 = random.randint(0, 400)
        amount = random.randint(100, 30000)
        ts = datetime.datetime.fromtimestamp(start_time + i).strftime("%Y-%m-%d %H:%M:%S")
        writer.writerow([atm, temp, c500, c200, amount, ts])

print("Generated:", OUTPUT)


Generated: atm_data.csv


## Load Data into PySpark DataFrame
We load the CSV file and infer schema automatically.


In [None]:
df = spark.read.csv("atm_data.csv", header=True, inferSchema=True)
df.show(5)
print("Total rows:", df.count())


+------+----+--------+--------+------------------+-------------------+
|atm_id|temp|cash_500|cash_200|transaction_amount|                 ts|
+------+----+--------+--------+------------------+-------------------+
|ATM008|  67|      67|     167|             26872|2025-12-10 10:41:38|
|ATM016|  45|      25|     166|             12748|2025-12-10 10:41:39|
|ATM015|  48|     142|      84|              8473|2025-12-10 10:41:40|
|ATM003|  28|     134|     261|              8975|2025-12-10 10:41:41|
|ATM015|  50|     219|     328|             26019|2025-12-10 10:41:42|
+------+----+--------+--------+------------------+-------------------+
only showing top 5 rows
Total rows: 20000


## RDD Example: High Temperature Alerts
Count all ATM logs where temperature > 50°C.


In [None]:
rdd = df.rdd
high_temp = rdd.filter(lambda row: row["temp"] > 50).count()
print("High temperature alerts =", high_temp)


High temperature alerts = 9907


## Add a new column 'total_cash'
total_cash = (cash_500 * 500) + (cash_200 * 200)


In [None]:
from pyspark.sql.functions import col

df2 = df.withColumn("total_cash", col("cash_500")*500 + col("cash_200")*200)
df2.show(5)



+------+----+--------+--------+------------------+-------------------+----------+
|atm_id|temp|cash_500|cash_200|transaction_amount|                 ts|total_cash|
+------+----+--------+--------+------------------+-------------------+----------+
|ATM008|  67|      67|     167|             26872|2025-12-10 10:41:38|     66900|
|ATM016|  45|      25|     166|             12748|2025-12-10 10:41:39|     45700|
|ATM015|  48|     142|      84|              8473|2025-12-10 10:41:40|     87800|
|ATM003|  28|     134|     261|              8975|2025-12-10 10:41:41|    119200|
|ATM015|  50|     219|     328|             26019|2025-12-10 10:41:42|    175100|
+------+----+--------+--------+------------------+-------------------+----------+
only showing top 5 rows


## Fraud Detection using Spark SQL
Fraud = transaction_amount > 10,000


In [None]:
df2.createOrReplaceTempView("atm")

fraud_df = spark.sql("""
SELECT atm_id, temp, cash_500, cash_200, transaction_amount, ts
FROM atm
WHERE transaction_amount > 10000
""")

fraud_df.show(10)
print("Fraud count:", fraud_df.count())


+------+----+--------+--------+------------------+-------------------+
|atm_id|temp|cash_500|cash_200|transaction_amount|                 ts|
+------+----+--------+--------+------------------+-------------------+
|ATM008|  67|      67|     167|             26872|2025-12-10 10:41:38|
|ATM016|  45|      25|     166|             12748|2025-12-10 10:41:39|
|ATM015|  50|     219|     328|             26019|2025-12-10 10:41:42|
|ATM016|  61|      29|     293|             26244|2025-12-10 10:41:43|
|ATM006|  64|     229|     234|             22421|2025-12-10 10:41:44|
|ATM020|  27|      39|     230|             17159|2025-12-10 10:41:45|
|ATM020|  32|     265|     344|             18523|2025-12-10 10:41:46|
|ATM019|  66|     112|      16|             10898|2025-12-10 10:41:47|
|ATM013|  57|      11|     109|             21829|2025-12-10 10:41:48|
|ATM017|  28|      25|      12|             29089|2025-12-10 10:41:49|
+------+----+--------+--------+------------------+-------------------+
only s

## Temperature Alerts (temp > 50°C)


In [None]:
temp_df = df2.filter(col("temp") > 50)
temp_df.show(5)
print("Temperature alert rows:", temp_df.count())


+------+----+--------+--------+------------------+-------------------+----------+
|atm_id|temp|cash_500|cash_200|transaction_amount|                 ts|total_cash|
+------+----+--------+--------+------------------+-------------------+----------+
|ATM008|  67|      67|     167|             26872|2025-12-10 10:41:38|     66900|
|ATM016|  61|      29|     293|             26244|2025-12-10 10:41:43|     73100|
|ATM006|  64|     229|     234|             22421|2025-12-10 10:41:44|    161300|
|ATM019|  66|     112|      16|             10898|2025-12-10 10:41:47|     59200|
|ATM013|  57|      11|     109|             21829|2025-12-10 10:41:48|     27300|
+------+----+--------+--------+------------------+-------------------+----------+
only showing top 5 rows
Temperature alert rows: 9907


## Aggregate Cash Summary per ATM
Calculates average total_cash for each ATM ID.


In [None]:
from pyspark.sql.functions import avg

cash_summary = df2.groupBy("atm_id").agg(avg("total_cash").alias("avg_total_cash"))
cash_summary.show(10)


+------+------------------+
|atm_id|    avg_total_cash|
+------+------------------+
|ATM009|115957.54716981133|
|ATM019|116029.37198067633|
|ATM018|114216.96606786427|
|ATM002|114679.09715407262|
|ATM015|117035.53553553554|
|ATM005|115231.67701863353|
|ATM008|116716.66666666667|
|ATM017| 115828.1473899693|
|ATM001|114561.79540709812|
|ATM007|    113175.5859375|
+------+------------------+
only showing top 10 rows


## Save Outputs as CSV



In [None]:
fraud_df.coalesce(1).write.mode("overwrite").csv("fraud_output", header=True)
temp_df.coalesce(1).write.mode("overwrite").csv("temp_output", header=True)
cash_summary.coalesce(1).write.mode("overwrite").csv("cash_summary_output", header=True)


## Download Generated CSV Files.


In [None]:
from google.colab import files
files.download("atm_data.csv")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>