# 1. Data Ingestion Pipeline:

In [None]:
"""
a. Design a data ingestion pipeline that collects and stores data from various sources such as databases, 
APIs, and streaming platforms."""

import pandas as pd
from sqlalchemy import create_engine
import requests

# Define the database connection details
db_username = "your_username"
db_password = "your_password"
db_host = "your_host"
db_name = "your_database"

# Define the API endpoint URL
api_url = "https://api.example.com/data"

# Connect to the database
db_url = f"postgresql://{db_username}:{db_password}@{db_host}/{db_name}"
engine = create_engine(db_url)

# Collect data from databases
def collect_data_from_databases():
    # Execute SQL queries or fetch tables using ORM
    query = "SELECT * FROM table_name"
    df = pd.read_sql(query, engine)
    return df

# Collect data from APIs
def collect_data_from_api():
    # Make API request and retrieve data
    response = requests.get(api_url)
    if response.status_code == 200:
        data = response.json()
        df = pd.DataFrame(data)
        return df
    else:
        print("Error fetching data from API")

# Store data in the database
def store_data_in_database(df):
    df.to_sql("table_name", engine, if_exists="append", index=False)

# Data ingestion pipeline
def data_ingestion_pipeline():
    # Collect data from various sources
    database_data = collect_data_from_databases()
    api_data = collect_data_from_api()

    # Combine and process the collected data
    merged_data = pd.concat([database_data, api_data], ignore_index=True)
    processed_data = preprocess_data(merged_data)

    # Store the processed data in the database
    store_data_in_database(processed_data)

# Preprocessing function for data cleaning and transformation
def preprocess_data(df):
    # Perform necessary data cleaning and transformation steps
    # ...

    return df

# Execute the data ingestion pipeline
data_ingestion_pipeline()


In [None]:
'''
   b. Implement a real-time data ingestion pipeline for processing sensor data from IoT devices.
'''

from pykafka import KafkaClient
from pykafka.common import OffsetType
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.table import StreamTableEnvironment

# Kafka broker details
kafka_bootstrap_servers = "localhost:9092"
kafka_topic = "sensor-data-topic"

# Flink execution environment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)

# Kafka consumer properties
properties = {
    "bootstrap.servers": kafka_bootstrap_servers,
    "group.id": "sensor-data-consumer"
}

# Configure Kafka consumer
consumer = FlinkKafkaConsumer(
    kafka_topic,
    SimpleStringSchema(),
    properties=properties,
    start_from_latest=True,  # Start consuming from the latest offset
    offset_reset=OffsetType.LATEST
)

# Add Kafka consumer as a data source
sensor_data_stream = env.add_source(consumer)

# Process the sensor data stream (apply custom transformations, calculations, etc.)
processed_data_stream = sensor_data_stream.map(lambda x: process_sensor_data(x))

# Define sink for the processed data (e.g., writing to a database, sending to another system)
processed_data_stream.print()

# Execute the streaming job
env.execute("Real-time Sensor Data Processing Pipeline")

# Custom function to process the sensor data
def process_sensor_data(data):
    # Custom processing logic here
    processed_data = data + " processed"
    return processed_data


In [None]:
'''
 c. Develop a data ingestion pipeline that handles data from different file formats (CSV, JSON, etc.) 
 and performs data validation and cleansing.
'''


import pandas as pd

# File paths for the data files
csv_file_path = "data.csv"
json_file_path = "data.json"

# Define data validation and cleansing functions
def validate_data(df):
    # Perform data validation checks
    # ...

def cleanse_data(df):
    # Perform data cleansing operations
    # ...

# Data ingestion pipeline
def data_ingestion_pipeline():
    # Read CSV file
    csv_data = pd.read_csv(csv_file_path)

    # Validate and cleanse CSV data
    validate_data(csv_data)
    cleansed_csv_data = cleanse_data(csv_data)

    # Read JSON file
    json_data = pd.read_json(json_file_path)

    # Validate and cleanse JSON data
    validate_data(json_data)
    cleansed_json_data = cleanse_data(json_data)

    # Merge and process the data
    merged_data = pd.concat([cleansed_csv_data, cleansed_json_data], ignore_index=True)
    processed_data = process_data(merged_data)

    # Perform further operations or store the processed data
    # ...

# Custom function to process the merged data
def process_data(df):
    # Perform data processing operations
    # ...

# Execute the data ingestion pipeline
data_ingestion_pipeline()


# 2. Model Training:


In [None]:
'''
 a. Build a machine learning model to predict customer churn based on a given dataset. 
 Train the model using appropriate algorithms and evaluate its performance.
'''

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score


# Load the dataset (replace 'dataset.csv' with the actual file name)
dataset = pd.read_csv('dataset.csv')

# Split the dataset into features (X) and target (y)
X = dataset.drop('churn', axis=1)
y = dataset['churn']

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Perform feature scaling
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)


from sklearn.ensemble import RandomForestClassifier

# Choose a machine learning model
model = RandomForestClassifier()

# Train the model
model.fit(X_train_scaled, y_train)
# Make predictions on the test set
y_pred = model.predict(X_test_scaled)

# Calculate evaluation metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)

# Print the evaluation metrics
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1-Score: {f1}")


In [None]:
'''
 b. Develop a model training pipeline that incorporates feature engineering techniques such as one-hot encoding, 
 feature scaling, and dimensionality reduction.
'''

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.decomposition import PCA
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import LogisticRegression

# Load the dataset (replace 'dataset.csv' with the actual file name)
dataset = pd.read_csv('dataset.csv')

# Split the dataset into features (X) and target (y)
X = dataset.drop('target', axis=1)
y = dataset['target']

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Create a pipeline for feature engineering and model training
pipeline = make_pipeline(
    OneHotEncoder(),   # One-hot encode categorical variables
    StandardScaler(),  # Scale numeric variables
    PCA(n_components=0.95),  # Perform dimensionality reduction with PCA
    LogisticRegression()  # Train the logistic regression model
)

# Train the model
pipeline.fit(X_train, y_train)

# Evaluate the model
accuracy = pipeline.score(X_test, y_test)
print(f"Accuracy: {accuracy}")


In [1]:
'''
c. Train a deep learning model for image classification using transfer learning and fine-tuning techniques.
'''

#deep learning is not yet taught to FSDS 2.0 batch

'\nc. Train a deep learning model for image classification using transfer learning and fine-tuning techniques.\n'

# 3. Model Validation:


In [None]:
'''
a. Implement cross-validation to evaluate the performance of a regression model for predicting housing prices.'''

import pandas as pd
import numpy as np
from sklearn.model_selection import cross_val_score
from sklearn.linear_model import LinearRegression

# Load the dataset (replace 'dataset.csv' with the actual file name)
dataset = pd.read_csv('dataset.csv')

# Split the dataset into features (X) and target (y)
X = dataset.drop('price', axis=1)
y = dataset['price']

# Create a regression model
model = LinearRegression()

# Perform cross-validation
cv_scores = cross_val_score(model, X, y, cv=5, scoring='neg_mean_squared_error')

# Convert the negative mean squared errors to positive values
mse_scores = -cv_scores

# Calculate the root mean squared error (RMSE) from the mean of the scores
rmse = np.sqrt(mse_scores.mean())

print(f"Root Mean Squared Error (RMSE): {rmse}")


In [None]:
'''
 b. Perform model validation using different evaluation metrics such as accuracy, precision, recall, 
 and F1 score for a binary classification problem.
'''

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.linear_model import LogisticRegression

# Load the dataset (replace 'dataset.csv' with the actual file name)
dataset = pd.read_csv('dataset.csv')

# Split the dataset into features (X) and target (y)
X = dataset.drop('target', axis=1)
y = dataset['target']

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Create a binary classification model
model = LogisticRegression()

# Train the model
model.fit(X_train, y_train)

# Make predictions on the test set
y_pred = model.predict(X_test)

# Calculate evaluation metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)

# Print the evaluation metrics
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1-Score: {f1}")


In [None]:
'''
c. Design a model validation strategy that incorporates stratified sampling to handle imbalanced datasets.'''

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score
from sklearn.linear_model import LogisticRegression

# Load the dataset (replace 'dataset.csv' with the actual file name)
dataset = pd.read_csv('dataset.csv')

# Split the dataset into features (X) and target (y)
X = dataset.drop('target', axis=1)
y = dataset['target']

# Split the data into training and testing sets using stratified sampling
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)

# Create a binary classification model
model = LogisticRegression()

# Train the model on the training set
model.fit(X_train, y_train)

# Make predictions on the test set
y_pred = model.predict(X_test)

# Calculate accuracy
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy}")


# 4. Deployment Strategy:
