# Initial Dataset Manipulation. Transforming fetched data to dataset

In [1]:
!pip3 install -r requirements.txt

Collecting telethon
  Downloading Telethon-1.38.1-py3-none-any.whl (702 kB)
[K     |████████████████████████████████| 702 kB 2.3 MB/s eta 0:00:01
Collecting geopy
  Downloading geopy-2.4.1-py3-none-any.whl (125 kB)
[K     |████████████████████████████████| 125 kB 8.7 MB/s eta 0:00:01
Collecting pyaes
  Using cached pyaes-1.6.1-py3-none-any.whl
Collecting geographiclib<3,>=1.52
  Downloading geographiclib-2.0-py3-none-any.whl (40 kB)
[K     |████████████████████████████████| 40 kB 2.7 MB/s eta 0:00:011
Installing collected packages: pyaes, geographiclib, telethon, geopy
Successfully installed geographiclib-2.0 geopy-2.4.1 pyaes-1.6.1 telethon-1.38.1
You should consider upgrading via the '/Users/yaremakertytsky/Documents/Programming/Python/texty-task/scraping-from-boroshno/.venv/bin/python3 -m pip install --upgrade pip' command.[0m


# Initital Imports

In [2]:
import os
from time import sleep

import re
import csv
import logging
from dotenv import load_dotenv


import pandas as pd



from telethon import TelegramClient
from telethon.tl.types import Message

from opencage.geocoder import OpenCageGeocode
import google.generativeai as genai

In [None]:
"""
Step 1. Fetch info from telegram
"""

load_dotenv()

api_id = int(os.getenv('API_ID'))
api_hash = os.getenv('API_HASH')
phone_number = os.getenv('PHONE_NUMBER')


logging.basicConfig(
    filename="fetch_and_process_posts.log",
    level=logging.INFO,  
    format="%(asctime)s - %(levelname)s - %(message)s",
)

# Define regex pattern for coordinates (latitude, longitude)
coordinates_regex = r'-?\d{1,3}\.\d{5,19},\s*-?\d{1,3}\.\d{5,19}'

client = TelegramClient('session_name', api_id, api_hash)

async def fetch_and_process_posts(channel_username):
    try:
        await client.start(phone_number)
        logging.info(f"Client started for phone number {phone_number}")
        
        with open('dataset.csv', mode='a', newline='', encoding='utf-8') as file:
            writer = csv.writer(file)

            file.seek(0, 2)
            if file.tell() == 0:
                writer.writerow(['timestamp', 'post', 'coordinates'])
                logging.info("CSV file created with headers.")

            async for message in client.iter_messages(channel_username):
                try:
                    if message.text:
                        match = re.search(coordinates_regex, message.text)
                        if match:
                            post_text = message.text.replace('\n', ' ').replace('\t', ' ')
                            post_text = re.sub(r'\s+', ' ', post_text).strip()

                            timestamp = message.date.strftime('%Y-%m-%d %H:%M:%S')
                            coordinates = match.group(0)

                            writer.writerow([timestamp, post_text, coordinates])
                            logging.info(f"Written post with coordinates at {timestamp}")
                except Exception as e:
                    logging.error(f"Error processing message: {str(e)}")
                    continue

    except Exception as e:
        logging.error(f"Unexpected error occurred: {str(e)}")


channel_username = '@kiber_boroshno'
client.loop.run_until_complete(fetch_and_process_posts(channel_username))


In [None]:
"""
Step 2. Map Locations from coordinates
"""

api_key = os.getenv("OPEN_CAGE_API_KEY")
geocoder = OpenCageGeocode(api_key)

def get_location_details(lat, long):
    try:
        results = geocoder.reverse_geocode(lat, long)
        
        if results and len(results):
            # Get the first result
            location = results[0]['components']
            country = location.get('country', 'Unknown')

            # Try to get state/region, fallback to city if not available
            region = location.get('state', location.get('city', 'Unknown'))
            return country, region
        return None, None
    except Exception as e:
        logging.error(f"Error: {str(e)}")
        return None, None

df = pd.read_csv('dataset.csv')

for index, row in df.iterrows():
    latitude = row['latitude']
    longitude = row['longitude']


    country, region = get_location_details(latitude, longitude)
    df.at[index, 'country'] = country
    df.at[index, 'region'] = region

    sleep(1)  # Rate limiting - 1 RPS 
    logging.info(f"Updated row {index + 1} - Country: {country}, Region: {region}")

df.to_csv('dataset.csv', index=False)


In [32]:
"""
Step 4. Automatic Cleaning
"""

# Initial Cleaning
file_path = 'dataset.csv'
df = pd.read_csv(file_path)

# Cleaned all rows that are unrelated to Ukrainian strikes
df = df[df['country'].isin(['Ukraine', 'Russia'])]

# Cleaning out strikes conducted on Ukrainian targets
ukrainian_tgts = ["Kharkiv Oblast", "Sumy Oblast", 'Lviv Oblast', "Chernihiv Oblast", "Dnipropetrovsk Oblast", "Kyiv Oblast", "Chernivtsi Oblast"]
df = df[~df['region'].isin(ukrainian_tgts)]


# Cleaned everything related to FPV Strikes
def contains_fpv(post_text):
    pattern = r"fpv|фпв"
    
    if isinstance(post_text, str) and re.search(pattern, post_text, re.IGNORECASE):
        return True
    return False

df = df[~df["post"].apply(contains_fpv)]


# cleaned posts that mention штурми
def contains_shturm(post_text):
    pattern = r"штурм|штурмовик"
    
    if isinstance(post_text, str) and re.search(pattern, post_text, re.IGNORECASE):
        return True
    return False


df = df[~df["post"].apply(contains_shturm)]

# cleaned posts related to AA Fpv Strikes
def contains_aa_fpv(post_text):
    lst = ["орлан", "orlan", "зала", "zala", "суперкам", "supercam", "ланцет", "lancet"]
    pattern = r"\b(" + "|".join(lst) + r")\b"
    
    if isinstance(post_text, str) and re.search(pattern, post_text, re.IGNORECASE):
        return True
    return False

df = df[~df["post"].apply(contains_aa_fpv)]

# cleaned posts related to tanks anf IFV's
def contains_tank(post_text):
    lst = ["танк", "екіпаж", "бредлі", "bradley", "бмп", "бтр", "мтлб", "бмд"]
    pattern = r"\b(" + "|".join(lst) + r")\b"
    
    if isinstance(post_text, str) and re.search(pattern, post_text, re.IGNORECASE):
        return True
    return False

df = df[~df["post"].apply(contains_tank)]

def contains_captivity_mentions(post_text):
    pattern = r"полон" 
    
    if isinstance(post_text, str) and re.search(pattern, post_text, re.IGNORECASE):
        return True
    return False

df = df[~df["post"].apply(contains_captivity_mentions)]

# cleaning out stuff related to infantry strikes and frontline action
def infantry_strikes(post_text):
    lst = [
    "піхот",
    "омбр",
    "ошб",
    "обрмп",
    "нгу",
    "омпбр",
    "оабр",
    "тро",
    "огшбр",
    "оаебр",
    ]
    pattern = r"\b(" + "|".join(lst) + r")\b"

    if isinstance(post_text, str) and re.search(pattern, post_text, re.IGNORECASE):
        return True
    return False

df = df[~df["post"].apply(infantry_strikes)]

df.to_csv("dataset.csv", index=False)

"""
Due to limited resources in terms of API credit for LLM i've also conducted manual screening to remove rows unrelated to topic of EDA
"""

# Ai Operations

In [8]:
df = pd.read_csv("dataset.csv")

# List of Columns that AI would identify
columns = [
    "Liquidation Type", "Attack Info", "Object Type", "Object Name",
    "Weapon Type Used", "Weapon Name", "Special FLAG"
]

for col in columns:
    if col not in df.columns:
        df[col] = None

def call_gemini_api(post_text):
    prompt = f"""
    Task: Extract structured data from a social media post and return it in structured format.

    Post: "{post_text}"

    Format:
    Liquidation Type: (Human / Structure / F)
    Object Type: (Military Base, Infrastructure, Oil Refining, Warehouse, Aerodrome, Air Defence, Transport, Port / F)
    Object Name: (Specific name or F)
    Weapon Type Used: (cruise missile, drone, etc. / F)
    Weapon Name: (Specific weapon name or F)
    Special FLAG: (Anomaly, missing data, etc / None)
    F stands for Failure or any form of misunderstanding.

    Response:
    Answer in csv format without mentioning name of columns
    """

    try:
        genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
        model = genai.GenerativeModel("gemini-1.5-flash")
        response = model.generate_content(prompt)
        
        return response.text
    except Exception as e:
        logging.error(f"Gemini API Error: {e}")
        return None
    

for index, row in df.iterrows():
    if row["Liquidation Type"] is None or row["Liquidation Type"] == "NONE":
        extracted_data = call_gemini_api(row["post_text"])
        if extracted_data:
            for col in columns:
                df.at[index, col] = extracted_data.get(col, "NONE")


df.to_csv("dataset_ready.csv", index=False)

logging.info("Processing completed. Updated dataset saved.")


Processing completed. Updated dataset saved.


  from .autonotebook import tqdm as notebook_tqdm
