In [None]:
import kafka
import json
import requests
import os
from dotenv import load_dotenv
from sqlalchemy import create_engine
import pandas as pd



In [None]:
os.chdir(r"E:\LocalProjects\GitRepos\End2EndDataEngineeringPipeline")
os.getcwd()

In [None]:
load_dotenv(dotenv_path="StockMarketAnalysis\python_api_producer\.env")

In [None]:
username = os.getenv('DB_USER')
password = os.getenv('DB_PASS')
database = os.getenv('DB_NAME')
host = os.getenv('DB_HOST')
post = os.getenv('DB_PORT')
print(username)

In [None]:
path = r"C:\Users\uttka\Downloads\ind_nifty500list.csv"
nifty500 = pd.read_csv(path)
display(nifty500.head())

In [None]:
nifty500['created_at'] = pd.Timestamp.now()
display(nifty500)

#### Insert Nifty 500 Data

In [None]:
# con = create_engine(f'postgresql://{username}:{password}@{host}:{post}/{database}')
# with con.begin() as conn:
#     nifty500.to_sql('stocks_data', con=con, if_exists='replace', index=False)

In [None]:
symbol = 'ADANIENT'
interval = '1' # for minutes 1,5,15 for day or month or week put 1D
start_time = '1354410000'
end_time = '1758821896'
count = 10000

In [None]:

base_url = "https://priceapi.moneycontrol.com/techCharts/indianMarket/stock/history?"
params = f"symbol={symbol}&resolution={interval}&from={start_time}&to={end_time}&countback={count}&currencyCode=INR"
url = base_url+params
print(url)

In [None]:
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.93 Safari/537.36',
    'Accept-Language': 'en-US,en;q=0.9',
    'Accept-Encoding': 'gzip, deflate, br',
    'origin': 'https://www.moneycontrol.com'
}

response = requests.get(url, headers=headers)
data = response.json()


In [None]:
df = pd.DataFrame({
    'symbol': symbol,
    'metric_time': data['t'],
    'open_price': data['o'],
    'close_price': data['c'],
    'high': data['h'],
    'low': data['l'],
    'volume': data['v'],
    'created_at': pd.Timestamp.now()
})
df['metric_time'] = pd.to_datetime(df['metric_time'], unit='s')
display(df.sort_values("metric_time"))

In [None]:
engine = create_engine(f'postgresql://{username}:{password}@{host}:{post}/{database}')
with engine.begin() as conn:
    df.to_sql('stocks_1m_interval', con=conn, if_exists='replace', index=False)

In [None]:
def pd_timestamp_serializer(ts):
    if isinstance(ts, pd.Timestamp):
        return ts.isoformat()
    raise TypeError(f"Type {type(ts)} is not serializable")

In [None]:
kafkaproducer = kafka.KafkaProducer(
    bootstrap_servers=['192.168.1.15:9092'],
    value_serializer=lambda x: json.dumps(x, default=pd_timestamp_serializer).encode('utf-8'),
    key_serializer=lambda x: x.encode('utf-8')
)


In [None]:
for i,row in df.iterrows():
    print(row.to_dict())
    kafkaproducer.send(topic='test-data', key=row['symbol'], value=row.to_dict())
    
kafkaproducer.flush()
kafkaproducer.close()