In [3]:
import psutil
import os

pid = os.getpid()
py = psutil.Process(pid)
print(2)

2


In [2]:
memory_before = py.memory_info()[0] / 2.**20
memory_before

'\ntransform lists to row\n'

In [3]:
import logging
import csv
from typing import Any, List, Callable,Tuple,Optional
from collections import namedtuple
from inspect import signature # interesting
from functools import partial
import concurrent.futures
from numba import jit

'\nmaintain unique number of consumers\n'

In [4]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

'\nmaintain list of segments (unique)\n'

In [5]:
UserAds = namedtuple('UserAds', ['user_id', 'ads'])

class Notifier:
    def __init__(self, name):
        self.name = name
        self.value = None

    def __get__(self, instance, owner):
        return self.value

    def __set__(self, instance, value):
        if self.name == "error" and self.value is not None:
            logger.info(f"Changing {self.name} from {self.value} to {value}")
        self.value = value


class Handler:
    notification = Notifier("error")

    def __init__(self, value: Any = None, error: str = None):
        self.value = value if value is not None else []
        self.error = error

    def is_success(self) -> bool:
        return self.error is None

    def __repr__(self) -> str:
        return f"Handler(value={self.value}, error={self.error})"


class Transformer:

    UNIQUE_ADS = set()
    
    @classmethod
    def unique_segments(cls,ad:int):
        cls.UNIQUE_ADS.add(ad)

    @staticmethod
    def string_to_list(ads: str) -> List[int]: # can be placed in Utils class
        if not ads:
            return []
        try:
            return [int(ad) for ad in ads.split(',')]
        except ValueError as e:
            logger.error(f"Error converting string to list: {e}")
            return Handler(error=e, value=ads)
        
    @staticmethod
    def remove_duplicates(ads: List[int]) -> List[int]:
        seen = set()
        return list(filter(lambda ad: ad not in seen and not seen.add(ad), ads))
    
    @staticmethod
    def sort_by_priority(ads: List[int], group:set) -> List[int]:
        return sorted(ads, key=lambda x: (0, x) if x in group else (1, x))
    
    @staticmethod
    def remove_ad(cls, ads: List[int], ad: int) -> List[int]:
        cls.UNIQUE_ADS.update(filter(lambda x: x != ad, ads)) 
        return list(filter(lambda x: x != ad, ads))

    
    @staticmethod
    def remove_group(ads: List[int], group:set) -> List[int]:
        if all(ad in ads for ad in group):
            for ad in group:
                ads.remove(ad)
        return ads

    @staticmethod
    def list_to_str(ads: List[int]) -> str: # can be placed in Utils class
        if not ads:
            return ""
        try:
            return ",".join(map(str, ads))
        except ValueError as e:
            logger.error(f"Error converting string to list: {e}")
            return Handler(error=e, value=ads)
    
    @staticmethod
    def remove_user_without_ad(user_id:str, ads:List[int]) -> Optional[Tuple[str, List[int]]]:
        return (user_id, ads) if ads else None
    

    @classmethod
    def transform(cls, ads: str, steps: List[Callable]) -> List[int]:
        data = ads
        for step in steps:
            if callable(step):
                data = step(data)
        return data

from concurrent.futures import ThreadPoolExecutor, as_completed
import csv

class AdsExtractor:
    def __init__(self, filename, delimiter):
        self.filename = filename
        self.delimiter = delimiter
        self.data: List[UserAds] = []

    def _process_row(self, row) -> UserAds:
        return UserAds(row[0], row[1])

    def load_data(self) -> Handler:
        try:
            with open(self.filename, 'r') as file:
                reader = csv.reader(file, delimiter=self.delimiter)
                rows = list(reader)  # Read all rows at once
                
                # Parallel processing using ThreadPoolExecutor
                with ThreadPoolExecutor() as executor:
                    futures = [executor.submit(self._process_row, row) for row in rows]
                    self.data = [future.result() for future in as_completed(futures)]
                
                return Handler(value=self.data)
                
        except FileNotFoundError:
            logger.error(f"The file '{self.filename}' was not found.")
            return Handler(error=f"The file '{self.filename}' was not found.")
        except PermissionError:
            logger.error(f"Permission denied when trying to open '{self.filename}'.")
            return Handler(error=f"Permission denied when trying to open '{self.filename}'.")
        except csv.Error as e:
            logger.error(f"Problem reading CSV file: {e}")
            return Handler(error=f"Problem reading CSV file: {e}")
        except IndexError:
            logger.error("Row is missing values. Each row must contain a user_id and segments.")
            return Handler(error="Row is missing values. Each row must contain a user_id and segments.")
        except Exception as e:
            logger.error(f"An unexpected error occurred: {e}")
            return Handler(error=f"An unexpected error occurred: {e}")



'\ntrack average transfomation per consumer\n'

In [6]:
priority_group = {188,449,561,484,507,520,519}
remove_group = {519,763,988}
ad = 843


transformer = Transformer()
transformation_steps = [
    Transformer.string_to_list,  
    partial(Transformer.remove_duplicates), 
    partial(Transformer.sort_by_priority, group=priority_group),
    partial(Transformer.remove_group, group=remove_group),
    partial(Transformer.remove_ad, transformer, ad=ad),
    Transformer.list_to_str
]

'\nget a number of processed rows and unique segments\n'

In [7]:
adsLoader = AdsExtractor(filename='full.txt', delimiter='\t')
loading_results = adsLoader.load_data()

'\ngive me consumers with are in more then a specific segments\n'

In [8]:
def save_to_txt(data, filename="output.txt"):
    with open(filename, mode='w') as file:
        for user_id, processed_ads in data:
            ads_str = ",".join(map(str, processed_ads)) if isinstance(processed_ads, list) else str(processed_ads)
            file.write(f"{user_id}\t{ads_str}\n")


'\ninsert a new consumer and update existing\n'

In [9]:
if loading_results.is_success():
    result_data = []
    for user_ads in loading_results.value:
       user_id, processed_ads = user_ads.user_id, Transformer.transform(ads=user_ads.ads, steps=transformation_steps)
       if len(processed_ads) > 0:
            result_data.append((user_id, processed_ads))
    save_to_txt(result_data, filename="output.txt")
else:
    print(f"Error loading data: {loading_results.error}")

'\nremove a segment\n'

In [10]:
memory_before = py.memory_info()[0] / 2.**20
memory_before

'\ncheck if segments code exists\n'

In [11]:
transformer.UNIQUE_ADS

'\nremove a combination of segments\n'

In [12]:
# e881357f9b81ccc4621c70f1644124cc
# 1a8b1d6f95148119d421483609576b06

'\nremove a segments for a consumer, which has more then some value and filter this with a specific group\n'

'\nremove a consumer with a specific combination of segments\n'