In [None]:
# This is an introduction into Python concepts for Data Engineering 
# It will be broken down into 3 parts and use scenario based learning approach 

In [None]:
# Variables, Data Types, and Operators

# Variables -> areas within memory used to store information 
# Data types -> determines the type of data stored in the memory space 
# operations -> commands that perform mathematical operations 

In [None]:
# What is a Variable -> areas within memory used to store information 
# Defining variables
num_customers = 5000  # Integer (number of customers)
average_spend = 75.30  # Float (average amount spent per customer)

# Calculating total revenue
total_revenue = num_customers * average_spend

# Printing results
print("Total Revenue:", total_revenue)
print("Type of Total Revenue:", type(total_revenue))




Total Revenue: 376500.0
Type of Total Revenue: <class 'float'>


In [30]:
# STRING VARIABLE AND DATA TYPE 
# For storing stuff like customer data ie Name or status of an operation 
customer_name = "John Doe"  # Name of a customer
transaction_status = "Completed"  # Status of a transaction

print(customer_name) 
print(transaction_status)



John Doe
Completed


In [None]:
# Operations -> actions performed on data(like numbers, texts etc)
# There are different types of operations and we will discuss few 

# 1. Arithmetic Operations 
# Example: Sales Data Processing
total_sales = 10000  # Total sales in dollars
num_customers = 250  # Total number of customers

# Calculate key metrics
average_spend = total_sales / num_customers  # Average spend per customer
sales_target = 15000
sales_gap = sales_target - total_sales  # How much more is needed to reach target?
discount_rate = 10 / 100  # 10% discount
discount_amount = total_sales * discount_rate  # Discount given on total sales

# Print results
print("Average Spend per Customer:", average_spend)
print("Sales Gap to Target:", sales_gap)
print("Total Discount Amount:", discount_amount)


# Scenarios 
# 1. Aggregating Data – Summing transaction amounts for revenue calculations.
# 2. Calculating Metrics – Computing averages, profit margins, or growth percentages.
# 3. Data Cleaning – Normalizing values by applying mathematical transformations.
# 4. ETL Pipelines – Adjusting timestamps, partitioning datasets, and handling missing data.


Average Spend per Customer: 40.0
Sales Gap to Target: 5000
Total Discount Amount: 1000.0


In [None]:
# Boolean is a data type that can only have two values
first_value = True 
second_value = False 


# Boolean operators help combine or compare Boolean values.
# Define variables
data_loaded = True
error_count = 5
has_errors = error_count > 0  # Boolean condition

# Decision-making using Boolean operators
if data_loaded and not has_errors:
    print("Data is ready for processing")
else:
    print("Data has errors, needs fixing")

# Filtering high-risk transactions
transaction_amount = 5000
is_fraudulent = (transaction_amount > 3000 or has_errors)

print("Is the transaction fraudulent?", is_fraudulent)


# Scenarios to use them 
# 1. DATA VALIDATION 
# 2. FILTERING IN PIPELINES 
# 3. HANDLING ERRORS IN DATA PROCESSING 




Data has errors, needs fixing
Is the transaction fraudulent? True


In [None]:
# LOGIC OPERATIONS 
# are actions that involve Boolean values (True or False). They help make decisions.

# Types of Logical operations 
# and -> both values need to be true for the operation to be true 
# or -> atleast one should be true for the operation to be true 
# not -> this returns the opposite of the original valuue 
a = True 
b = False 

c = not a 
print(c) 

d = a or b 
print(d) 

e = a and b 
print(e) 


# Examples
# Data processing checks
data_loaded = True
has_errors = False
is_ready = data_loaded and not has_errors  # True only if data is loaded and has no errors

# Fraud detection logic
transaction_amount = 5000
is_fraud = (transaction_amount > 3000 or has_errors)

# Printing results
print("Is the data ready for processing?", is_ready)
print("Is the transaction fraudulent?", is_fraud)


# SCENARIOS 
# 1. Filtering data in ETL pipelines 
# 2. Data Quality Checks 
# 3. Automating decision making Pipelines(used with if and else statement)






False
True
False
Is the data ready for processing? True
Is the transaction fraudulent? True
Proceed with processing


In [None]:
# COMPARATIVE OPERATORS 
# They are used to compare values and return True or False. 
# # <, >, >=, <=, ==, !=, Is, is not 

# Examples 
# Sales Data Analysis
sales = 12000
customer_status = "VIP" if sales > 10000 else "Regular"

# Error Handling in Data Processing
error_count = 7
if error_count >= 5:
    print("Warning: Too many errors in data!")

# Checking for Null Values
data = None
if data is None:
    print("No data available for processing")

# Transaction Fraud Detection
transaction_amount = 5000
if transaction_amount > 3000:
    print("High-value transaction detected. Further review needed.")

# Print results
print("Customer Status:", customer_status)



# scenarios to use comparative operators 
# 1. Filtering Data in ETL Pipelines
# 2. Data Validation
# 3. Flagging Anomalies
# 4. Detecting Late Transactions






No data available for processing
High-value transaction detected. Further review needed.
Customer Status: VIP


In [None]:
# Control Flow
# This refers to how the code makes decisions and loops through data. 

# IF STATEMENT 

# Example 
record_count = 5000  

if record_count > 0:
    print("Data is available for ingestion")
else:
    print("No records found, check data source")

# Scenarios 
# Validating Data Before Ingestion
# Handling Missing Values
# Determining Data Processing Logic


password correct


In [None]:
# FOR LOOP 
# Iterate Over Data


# Process a list of customer IDs
customer_ids = [101, 102, 103, 104, 105]

for customer_id in customer_ids:
    print(f"Processing customer ID: {customer_id}")


# WHILE LOOP 
# Run Until a Condition is False
data_available = False
attempts = 0
max_attempts = 3

while not data_available and attempts < max_attempts:
    print("Waiting for data...")
    attempts += 1
    # Simulate data becoming available
    if attempts == 3:
        data_available = True
        print("Data is available")


# Scenarios to use loop ? 
# Iterating Through Data Records in an ETL Pipeline
records = [(101, 'John Doe'), (102, 'Jane Smith')]
for record in records:
    print(record)  # Process each record in the ETL pipeline

# Data Transformation
data = [100, 200, 300, 400]
for i in range(len(data)):
    data[i] *= 1.1  # Apply a 10% increase to each record
print(data)





Processing customer ID: 101
Processing customer ID: 102
Processing customer ID: 103
Processing customer ID: 104
Processing customer ID: 105
Waiting for data...
Waiting for data...
Waiting for data...
Data is available
(101, 'John Doe')
(102, 'Jane Smith')
[110.00000000000001, 220.00000000000003, 330.0, 440.00000000000006]


In [None]:
# FUNCTIONS 
# is a reusable block of code designed to perform a specific task.

# Examples 
# returns a greeting message 
def greet():
    return "Hello, Data Engineer!"

print(greet()) # Output: Hello, Data Engineer!


# Scenarios
# 1.  Data Transformation in ETL Pipelines
def clean_data(data):
    return [d.strip().lower() for d in data]  # Clean data by stripping and lowering case

raw_data = ["  Hello  ", "   World   ", "  Data  "]
cleaned_data = clean_data(raw_data)
print(cleaned_data)  # Output: ['hello', 'world', 'data']

# 2. Automating Data Validation
def validate_data(data):
    return all(value is not None for value in data)

data = [1, 2, 3, None]
print(validate_data(data))  # Output: False

# 3. Batch Processing of Records
def process_batch(batch):
    # Simulate processing of a batch of records
    return [record * 2 for record in batch]

batch_data = [100, 200, 300]
processed_data = process_batch(batch_data)
print(processed_data)  # Output: [200, 400, 600]





Hello, Data Engineer!
['hello', 'world', 'data']
False
[200, 400, 600]


In [None]:
# OBJECT ORIENTED PROGRAMMING 
# is a way of structuring code using objects and classes.
# A class is like a blueprint for creating objects.

# An object is a real-world thing that has:
# Data (attributes, like a customer’s name or transaction amount)
# Actions (methods, like filtering records or processing files)

# Example 
class Transaction:
    def __init__(self, transaction_id, amount, status):
        self.transaction_id = transaction_id
        self.amount = amount
        self.status = status

    def is_successful(self):
        return self.status == "Completed"
    

# Scenarios 
# ETL PIPELINES 
class Extractor:
    def extract(self, source):
        return f"Extracting data from {source}"

class Transformer:
    def transform(self, data):
        return f"Transforming {data}"

class Loader:
    def load(self, data):
        return f"Loading data into database {data}"
    

# DATA PROCESSING AND ANALYSIS 
class DataCleaner:
    def clean_data(self, data):
        return [item.strip() for item in data if item]

class Aggregator:
    def aggregate_data(self, data):
        return sum(data) / len(data)


# TASK AUTOMATION 
class DataJob:
    def __init__(self, job_id, job_type):
        self.job_id = job_id
        self.job_type = job_type

    def run(self):
        return f"Running {self.job_type} job with ID: {self.job_id}"


# Define sample data (simulating raw extracted data)
raw_data = [" 120 ", " 300", " 450 ", " 600 ", None, " 750", "  900"]

# Creating instances of each class
extractor = Extractor()
transformer = Transformer()
loader = Loader()
cleaner = DataCleaner()
aggregator = Aggregator()
job = DataJob(job_id=202, job_type="ETL")

# Run ETL pipeline steps
pipeline_steps = [
    extractor.extract("list_of_numbers"),  # Extracting from a source
    transformer.transform("raw list data"),  # Simulating transformation step
    cleaner.clean_data(raw_data),  # Cleaning whitespace & removing None values
    aggregator.aggregate_data([120, 300, 450, 600, 750, 900]),  # Finding average
    loader.load("final processed data"),  # Loading transformed data
    job.run()  # Running the ETL job
]

# Print each step's result
for step in pipeline_steps:
    print(step)





Extracting data from list_of_numbers
Transforming raw list data
['120', '300', '450', '600', '750', '900']
520.0
Loading data into database final processed data
Running ETL job with ID: 202


In [4]:
# INHERITANCE 
# means that a child class can reuse and extend the functionality of a parent class.

# Example 
# Base class (Parent) - General Extractor
class Extractor:
    def __init__(self, source):
        self.source = source  # Can be a list

    def extract(self):
        return self.source  # Just return the data source (list)


# Child class - Extractor for List Data
class ListExtractor(Extractor):
    def extract(self):
        print("Extracting data from a list...")
        return [item.strip() for item in self.source if item]  # Clean spaces & remove None


# Transformer and Loader classes (same as before)
class Transformer:
    def transform(self, data):
        print("Transforming data...")
        return [int(item) for item in data]  # Convert to integers

class Loader:
    def load(self, data):
        print("Loading data into system...")
        return f"Loaded Data: {data}"


# Sample List Data (Simulating Extracted Data)
raw_data = [" 100 ", " 250", None, " 400 ", " 550 ", " 700", " 900 "]

# Create instances of Extractor, Transformer, and Loader
extractor = ListExtractor(raw_data)  # Uses Inheritance
transformer = Transformer()
loader = Loader()

# Running ETL Pipeline
extracted_data = extractor.extract()
transformed_data = transformer.transform(extracted_data)
loaded_data = loader.load(transformed_data)

# Printing Results
print("\nFinal Processed Data:")
print(loaded_data)


Extracting data from a list...
Transforming data...
Loading data into system...

Final Processed Data:
Loaded Data: [100, 250, 400, 550, 700, 900]


In [None]:
# EXCEPTION HANDLING 
# is a way to catch and handle errors in a program without stopping the entire process.
# SCENARIOS TO USE EXCEPTION HANDLING 

# Example
# Handling Missing Data in an ETL Pipeline
def process_record(record):
    try:
        # Assume 'amount' is a required field
        amount = float(record["amount"])
        return amount * 1.2  # Some transformation
    except KeyError:
        print(f"Missing 'amount' key in record: {record}")
        return None  # Return None instead of failing
    except ValueError:
        print(f"Invalid amount value: {record['amount']}")
        return None

# Example records
record1 = {"id": 1, "amount": "100"}
record2 = {"id": 2}  # Missing 'amount'
record3 = {"id": 3, "amount": "abc"}  # Invalid amount

print(process_record(record1))  # 120.0
print(process_record(record2))  # Logs error, returns None
print(process_record(record3))  # Logs error, returns None

# Catching Division by Zero in Data Processing
def calculate_ratio(total, count):
    try:
        return total / count
    except ZeroDivisionError:
        print("Division by zero error! Returning default value.")
        return 0  # Default value

print(calculate_ratio(100, 5))  # Output: 20.0
print(calculate_ratio(100, 0))  # Output: Logs error, returns 0






120.0
Missing 'amount' key in record: {'id': 2}
None
Invalid amount value: abc
None
20.0
Division by zero error! Returning default value.
0


In [None]:
# THANK YOU