In [1]:
from datetime import datetime

from pymongo import MongoClient, InsertOne, UpdateOne
import mlflow
from omegaconf import OmegaConf



import urllib.request
import tempfile
from gcsfs import GCSFileSystem



In [9]:

def mongo_client(config_file_path):
    """Funcion to turn mongo client ON"""
    config = OmegaConf.load(config_file_path)
    server_ip = config.main.server_ip
    port = config.main.port
    if config.main.login:
        user = config.main.user
        passwd = config.main.passwd
        client = MongoClient(f'mongodb://{server_ip}:{port}',
                            username=user,
                            password=passwd)
    else:
        client = MongoClient(f'mongodb://{server_ip}:{port}')

    return client



class FileHandler():
    """Class to get file and save it to GCP."""
    def __init__(self, gc_project):
        self.file_system = GCSFileSystem(project=gc_project)

    def spot_download_helper(self, url, dest_name):
        """Function to get file and save it to GCP."""
        dest_file = 'tv_spots_store/'+dest_name
        print(f"FILENAME:  {dest_file}")
        self.file_system.touch(dest_file)
        with self.file_system.open(dest_file, mode='wb') as file:
            with tempfile.NamedTemporaryFile() as bin_file:
                try:
                    urllib.request.urlretrieve(url, bin_file.name)
                    data = bin_file.read()
                    file.write(data)
                except Exception as e:
                    return str(e)
            file_url = file.info()
        return (dest_file, file_url)

In [3]:
# START SERVER IF NEEDED - docker run --name testing-mongo -p 27017:27017 mongo:latest
server = mongo_client('conf/mongo-config.yaml')
db = server.TV_scan


In [None]:
#Is it necessary to update?


mlflow.run('components/spotreg_update')


In [None]:
#Bellow the name of the collections
#db['spots_inventory']
#db['Proc_Jobs']

In [None]:
# https://www.programmingfunda.com/filter-records-in-mongodb-using-python/
# simple query - {'field1':'value', 'field2':'value'}
# number comparison - {"field": {"$gt": 25}} OR {"field": {"$lt": 25}} OR {"field": {"$lt": 25, "$gt": 25}}
# in list of values {'field':{$in:['item1', 'item2']}}
# Regex - {"field": {"$regex": "^M"}}
################
# MORE OPERATORS - https://www.mongodb.com/docs/manual/reference/operator/query/
################
filtering = {'BrandDesc':'PEUGEOT'}

In [None]:
cursor = db['spots_inventory'].find(filtering)

In [None]:
#INSPECT CURSOR
docs_toinspect = []
for doc in cursor[1:5]:
    # Building parameters
    docs_toinspect.append(doc)
    

In [None]:
docs_toinspect

In [None]:
docs_toproc = []
for doc in cursor:
    # Building parameters
    doc_id = doc['_id']
    docs_toproc.append(doc_id)
    

In [4]:
job_reference = 'test_peugeot'
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")



db['Proc_Jobs'].insert_one({'doc_list': docs_toproc, 'j_ref':job_reference, 'status':'Pending Processing.', 'ins_timestamp':now})

In [5]:
#INSPECT

job = db['Proc_Jobs'].find_one({'j_ref':job_reference})

filter_verif = db['spots_inventory'].find({'_id':{'$in':job['doc_list']}})

In [6]:
proc_toinspect = []
for doc in filter_verif[1:5]:
    # Building parameters
    proc_toinspect.append(doc)
    

In [7]:
proc_toinspect

[{'_id': ObjectId('63da36736ab4946366e9d21f'),
  'SpotCode': 417291,
  'SpotDesc': 'LIDER DE VENDAS EM PT 2021.MELHOR MARCA AUTOMOVEL 9\\ANO',
  'VehicleDesc': 'SIC',
  'BrandDesc': 'PEUGEOT',
  'SubBrandDesc': nan,
  'SectorDesc': 'INDUSTRIA AUTOMOVEL',
  'CategoryDesc': 'VEICULOS A MOTOR',
  'ClassDesc': 'AUTOMOVEIS',
  'SubClassDesc': 'OUTROS OU DIVERSOS',
  'AdvertiserDesc': 'PEUGEOT PORTUGAL AUTOMOVEIS SA',
  'MediaFile': 417291,
  'MediaFileOldUrl': 'http://e-sol.mediamonitor.pt/SOLFILES/TELEVISAO/20220103/417291.wmv'},
 {'_id': ObjectId('63da36736ab4946366e9d220'),
  'SpotCode': 417315,
  'SpotDesc': 'LIDER DE VENDAS PT 2021.MELHOR MARCA AUTO 9\\ANO-SH SICNOT',
  'VehicleDesc': 'SIC Noticias',
  'BrandDesc': 'PEUGEOT',
  'SubBrandDesc': nan,
  'SectorDesc': 'INDUSTRIA AUTOMOVEL',
  'CategoryDesc': 'VEICULOS A MOTOR',
  'ClassDesc': 'AUTOMOVEIS',
  'SubClassDesc': 'OUTROS OU DIVERSOS',
  'AdvertiserDesc': 'PEUGEOT PORTUGAL AUTOMOVEIS SA',
  'MediaFile': 417315,
  'MediaFileOldUrl

In [14]:
run_parameters = {'job_reference':job_reference}



mlflow.run('components/get_data', parameters=run_parameters)

2023/02/01 16:41:17 INFO mlflow.projects.docker: === Building docker image get_data:b183085 ===
2023/02/01 16:41:18 INFO mlflow.projects.utils: === Created directory /tmp/tmpcsjstdao for downloading remote URIs passed to arguments of type 'path' ===
2023/02/01 16:41:18 INFO mlflow.projects.backend.local: === Running command 'docker run --rm -v /home/laertio/projects/dev/tv_scan/mlruns:/mlflow/tmp/mlruns -v /home/laertio/projects/dev/tv_scan/mlruns/0/9eebfc7d37164b92a006bdcccd4d69da/artifacts:/home/laertio/projects/dev/tv_scan/mlruns/0/9eebfc7d37164b92a006bdcccd4d69da/artifacts -v /home/laertio/projects/dev/tv_scan/conf:/conf -e MLFLOW_RUN_ID=9eebfc7d37164b92a006bdcccd4d69da -e MLFLOW_TRACKING_URI=file:///mlflow/tmp/mlruns -e MLFLOW_EXPERIMENT_ID=0 get_data:b183085 python run.py tv_scan_test spots_inventory /conf/mongo-config.yaml test_peugeot --wandblogin ea7d4ed782356f66850586612f115565b6a8a0c3' in run with ID '9eebfc7d37164b92a006bdcccd4d69da' === 
wandb: W&B API key is configured. U

<mlflow.projects.submitted_run.LocalSubmittedRun at 0x7f41600bf760>

In [10]:
##### TESTING THE FILE SYSTEM

gcs_handler = FileHandler('publicispt-datastage')

In [12]:
db_updates = []

for doc in proc_toinspect:
    doc_updates = {}
    # Building parameters
    url = doc['MediaFileOldUrl']
    print(url)
    spot_code = str(doc['SpotCode'])
    brand = doc['BrandDesc'].replace(r'\s', '_') # Replace Whitespaces
    brand = doc['BrandDesc'].replace(r'\W', '_') # eliminate non-word
    file_format = doc['MediaFileOldUrl'][-4:]
    dest_name = spot_code+"_"+brand+file_format
    # Download the file if not downloaded yet
    dest_file, file_url = gcs_handler.spot_download_helper(url,
                                                            dest_name)
    doc_updates['GCP_path'] = file_url
    doc_updates['spot_store_filename'] = dest_file
    db_updates.append(UpdateOne({'_id': doc['_id']}, {'$set': doc_updates}))

http://e-sol.mediamonitor.pt/SOLFILES/TELEVISAO/20220103/417291.wmv
FILENAME:  tv_spots_store/417291_PEUGEOT.wmv
http://e-sol.mediamonitor.pt/SOLFILES/TELEVISAO/20220103/417315.wmv
FILENAME:  tv_spots_store/417315_PEUGEOT.wmv
http://e-sol.mediamonitor.pt/SOLFILES/TELEVISAO/20220117/417741.wmv
FILENAME:  tv_spots_store/417741_PEUGEOT.wmv
http://e-sol.mediamonitor.pt/SOLFILES/TELEVISAO/20220119/417811.wmv
FILENAME:  tv_spots_store/417811_PEUGEOT.wmv


In [13]:
db['spots_inventory'].bulk_write(db_updates)

<pymongo.results.BulkWriteResult at 0x7f415a706710>