In [1]:
import os
import pandas as pd
from tqdm import tqdm
from Scripts.XML_Extraction.Extract_XML import extract_from_xml

In [9]:
import paramiko

# Server details
hostname = 'ec2-18-168-204-252.eu-west-2.compute.amazonaws.com'
username = 'deploy'
pem_file_path = r'G:\$00-Work\ImpactScore\Connecting to EC2\impactscore.pem'
remote_dir = '/mnt/brandbank/7622210249661'

# Connect to the server
try:
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(hostname, username=username, key_filename=pem_file_path)
    
    # Open SFTP session
    sftp = ssh.open_sftp()
    
    # List contents of the specific directory
    items = sftp.listdir(remote_dir)
    
    # Print each item in the directory
    print(f"Contents of '{remote_dir}':")
    for item in items:
        print(item)
    
    # Close the SFTP session and SSH connection
    sftp.close()
    ssh.close()
except Exception as e:
    print("Connection failed:", e)


Contents of '/mnt/brandbank/7622210249661':
11420646.xml
2413957.xml
12173865.jpg
11306394.xml
5866702.xml
5866702.jpg
2413957.jpg
10618390.xml
11306394.jpg
12173865.xml
11420646.jpg


In [11]:
import paramiko
from tqdm import tqdm
import pandas as pd
import io
from concurrent.futures import ThreadPoolExecutor, as_completed
import os

# Server details
hostname = 'ec2-18-168-204-252.eu-west-2.compute.amazonaws.com'
username = 'deploy'
remote_dir = '/mnt/brandbank/'
pem_file_path = r'G:\$00-Work\ImpactScore\Connecting to EC2\impactscore.pem'

# Define columns for the final DataFrame
columns = [
    'name', 'gtin', 'isDelete', 'date', 'pl1', 'pl2', 'pl3', 
    'description', 'allergyAdvice', 'recycling', 'recycling_other', 
    'brands', 'marketing', 'ingredients', 'features', 'storage', 'preserves'
]

# Function to process each XML file using a separate SSH connection
def process_file(filename, filepath):
    try:
        print(f"Establishing SSH connection for file {filename}...")

        # Set up a new SSH and SFTP connection for each thread
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(hostname, username=username, key_filename=pem_file_path)
        sftp = ssh.open_sftp()

        # Read and process the XML file
        print(f"Processing file: {filename}")
        with sftp.file(filepath, 'r') as file:
            xml_content = file.read().decode('utf-8')
            productList = extract_from_xml(io.StringIO(xml_content), filename)

        # Notify closing of the connection
        print(f"Closing connection for file {filename}...")
        sftp.close()
        ssh.close()
        return productList
    except Exception as e:
        print(f"Failed to process {filename}: {e}")
        return None

# Helper function to recursively find all XML files in subdirectories
def find_all_xml_files(sftp, path):
    xml_files = []
    folders = [path]

    # Initialize tqdm progress bar
    folder_count = 0
    with tqdm(total=len(folders), desc="Processing Folders") as folder_bar:
        # Walk through the directory recursively
        while folders:
            current_folder = folders.pop(0)
            folder_count += 1

            # Update progress bar total dynamically
            folder_bar.total = folder_count
            folder_bar.refresh()
            
            # Process each folder and list its items
            for item in sftp.listdir_attr(current_folder):
                item_path = os.path.join(current_folder, item.filename)
                if item.st_mode & 0o40000:  # Check if it's a directory
                    folders.append(item_path)
                    folder_count += 1
                elif item.filename.endswith('.xml'):  # Check for XML files
                    xml_files.append(item_path)

            folder_bar.update(1)

    print(f"Total folders processed: {folder_count}")
    print(f"Total XML files found: {len(xml_files)}")
    return xml_files

# Main execution with parallel processing
try:
    print("Establishing initial connection to list files...")
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(hostname, username=username, key_filename=pem_file_path)
    sftp = ssh.open_sftp()
    
    print("Searching for XML files in all subdirectories...")
    files = find_all_xml_files(sftp, remote_dir)
    sftp.close()
    ssh.close()
    
    if not files:
        print("No XML files found in the directory or its subdirectories.")
    else:
        print("Initial connection closed after finding files.")

        # Process files in parallel using multiple connections
        outList = []
        print("Starting parallel processing of files...")
        with ThreadPoolExecutor(max_workers=4) as executor:
            future_to_file = {executor.submit(process_file, os.path.basename(file), file): file for file in files}

            # Track progress with tqdm and notify each file completion
            with tqdm(total=len(files), desc="Processing XML Files") as pbar:
                for future in as_completed(future_to_file):
                    result = future.result()
                    filename = future_to_file[future]
                    if result:
                        outList.append(result)
                        print(f"File {filename} processed successfully.")
                    else:
                        print(f"File {filename} processing failed.")
                    pbar.update(1)

        # Convert results to DataFrame
        print("Creating final DataFrame...")
        out_df = pd.DataFrame(outList, columns=columns)
        print("DataFrame created successfully with", len(out_df), "records.")

except Exception as e:
    print("Error during processing:", e)

finally:
    print("Closing any remaining open connections.")


Establishing initial connection to list files...
Searching for XML files in all subdirectories...


Processing Folders:   1%|          | 3164/505269 [07:25<19:39:36,  7.09it/s] 

Closing any remaining open connections.





KeyboardInterrupt: 

In [3]:
out_df

Unnamed: 0,name,gtin,isDelete,date,pl1,pl2,pl3,description,allergyAdvice,recycling,recycling_other,brands,marketing,ingredients,features,storage,preserves
0,905597.xml,0000000000185,0,2010-08-16,Dairy & Bread,White Bread & Rolls,Other,Waitrose Fig & Raisin Loaf 400g,,,,Waitrose,,,,,
1,643362.xml,0000000000246,0,2006-02-24,Deli,Unallocated,Unallocated,WR Gingerbread biscuit chick each,,,,,,,,,
2,905600.xml,0000000000260,0,2010-08-16,Dairy & Bread,White Bread & Rolls,French Bread/ Rolls,Waitrose Petit Parisienne 290g,,,,Waitrose,,,,,
3,980223.xml,0000000000581,0,2011-02-28,Fruit & Vegetables,Salad Products,Peppers,Waitrose Loose Yellow Peppers,,,,Waitrose,,,,,
4,8445550.xml,00000000006491,0,2020-10-14,Fruit & Vegetables,Apples,Other Apples,Asda Pink Lady Apples,,,,Asda,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
157840,2375232.xml,9857423623122,0,2014-08-12,Meat & Fish,Fresh Fish,Salmon,Wrights of Howth Sliced Smoked Salmon 400gr,,,,Wrights of Howth,,"Salmon, Salmo Salar Salt, Oak Smoke",,Keep Refrigerated,
157841,5596613.xml,9993510358956,0,2018-03-19,Meat & Fish,Shellfish,Other Shellfish,Morrisons Fresh Live Scottish Mussels 1kg,,,,Morrisons,,,,"Not Suitable for Home Freezing, Keep Refrigerated",
157842,4607178.xml,9999999999994,0,2017-07-20,Meat & Fish,Lamb,Other Lamb,Glenmór Lamb Carvery Shoulders,,,,Glenmór,,,Irish country meats,,
157843,6771345.xml,9999999999994,0,2019-03-06,Meat & Fish,Reformed Meats,Burgers,Hogan's Farm 12 Buffalo Turkey Burgers and 12 ...,"Sulphur Dioxide/Sulphites, Contains",,,Hogan's Farm,,"Turkey (75%), Buffalo Glaze (Acidity Regulator...","High in protein, Low in fat, Fresh class A tur...",,


In [4]:
out_df.to_csv(r'Data/Bulk Extract Full_oct_2024.csv', index = False)

In [1]:
import paramiko
import psycopg2
import io
from tqdm import tqdm
import os
import pandas as pd
from Scripts.XML_Extraction.Extract_XML import extract_from_xml_string

# PostgreSQL connection details
db_config = {
    'database': 'Products_Phrases',
    'user': 'Jack Simpson@impactscoredb',
    'password': 'hycMK>~4~3',  # Replace with actual password
    'host': 'impactscoredb.postgres.database.azure.com',
    'port': '5432'
}

# Server details
hostname = 'ec2-18-168-204-252.eu-west-2.compute.amazonaws.com'
username = 'deploy'
remote_dir = '/mnt/brandbank/'
pem_file_path = r'G:\$00-Work\ImpactScore\Connecting to EC2\impactscore.pem'

# Define columns for the DataFrame
columns = [
    'name', 'gtin', 'isDelete', 'date', 'pl1', 'pl2', 'pl3', 
    'description', 'allergyAdvice', 'recycling', 'recycling_other', 
    'brands', 'marketing', 'ingredients', 'features', 'storage', 'preserves'
]

# Step 1: Retrieve the highest XML ID in the database
def get_highest_xml_id():
    try:
        conn = psycopg2.connect(**db_config)
        cursor = conn.cursor()
        
        # Fetch all XML IDs from the database and get the highest ID
        cursor.execute("SELECT xml_id FROM brandbankxml;")
        max_xml_id = max(int(row[0][:-4]) for row in cursor.fetchall())
        
        cursor.close()
        conn.close()
        print(f"Highest XML ID in the database: {max_xml_id}")
        return max_xml_id
    except Exception as e:
        print(f"Error connecting to PostgreSQL: {e}")
        return 0  # If there are no IDs in the database, start from 0

# Function to process new XML files and return extracted data as a DataFrame
def collect_new_xml_data():
    # Retrieve the highest XML ID from the database
    max_xml_id_in_db = get_highest_xml_id()

    # Establish SSH connection
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(hostname, username=username, key_filename=pem_file_path)
    sftp = ssh.open_sftp()
    
    # Initialize an empty list to collect data for DataFrame
    extracted_data_list = []

    # Traverse only first-level directories within the brandbank folder
    print("Traversing first-level directories to find new XML files...")

    with tqdm(desc="Processing Product Folders") as pbar:
        for product_folder in sftp.listdir_attr(remote_dir):
            product_folder_path = f"{remote_dir}/{product_folder.filename}"  # Use '/' as separator
            
            # Ensure we only go one level deep and skip if it's not a directory
            if not (product_folder.st_mode & 0o40000):  # Check if it's a directory
                continue
            
            # Process XML files in the product folder
            for item in sftp.listdir_attr(product_folder_path):
                if item.filename.endswith('.xml'):
                    xml_id = int(item.filename[:-4])  # Convert filename to integer by removing ".xml"
                    item_path = f"{product_folder_path}/{item.filename}"  # Construct path with '/' separator
                    
                    # Process file only if its ID is higher than the max ID in the database
                    if xml_id > max_xml_id_in_db:
                        try:
                            # Read the file content
                            with sftp.open(item_path, 'r') as file:
                                xml_content = file.read().decode('utf-8')
                                
                                # Extract data and append to the list
                                extracted_data = extract_from_xml_string(xml_content, item.filename)
                                extracted_data_list.append(extracted_data)
                                
                        except FileNotFoundError:
                            print(f"File not found: {item_path}")

            pbar.update(1)

    # Close SFTP and SSH connections
    sftp.close()
    ssh.close()

    # Create a DataFrame from the collected data and return it
    out_df = pd.DataFrame(extracted_data_list, columns=columns)
    print("Data collection complete. Returning DataFrame.")
    return out_df

# Run the function to collect data and print the resulting DataFrame
out_df = collect_new_xml_data()
print(out_df)


Highest XML ID in the database: 12698297
Traversing first-level directories to find new XML files...


Processing Product Folders: 14203it [43:30,  5.44it/s]


KeyboardInterrupt: 

In [None]:
def get_highest_xml_id():
    try:
        conn = psycopg2.connect(**db_config)
        cursor = conn.cursor()
        
        # Fetch all XML IDs from the database and get the highest ID
        cursor.execute("SELECT xml_id FROM brandbankxml;")
        max_xml_id = max(int(row[0][:-4]) for row in cursor.fetchall())
        
        cursor.close()
        conn.close()
        print(f"Highest XML ID in the database: {max_xml_id}")
        return max_xml_id
    except Exception as e:
        print(f"Error connecting to PostgreSQL: {e}")
        return 0  # If there are no IDs in the database, start from 0

compare against all xml_ids

In [2]:
import paramiko
import psycopg2
import io
import os
from tqdm import tqdm
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from Scripts.XML_Extraction.Extract_XML import extract_from_xml_string  # Assuming this is your function

# PostgreSQL connection details
db_config = {
    'database': 'Products_Phrases',
    'user': 'Jack Simpson@impactscoredb',
    'password': 'hycMK>~4~3',  # Replace with actual password
    'host': 'impactscoredb.postgres.database.azure.com',
    'port': '5432'
}

# Server details
hostname = 'ec2-18-168-204-252.eu-west-2.compute.amazonaws.com'
username = 'deploy'
remote_dir = '/mnt/brandbank/'
pem_file_path = r'G:\$00-Work\ImpactScore\Connecting to EC2\impactscore.pem'

# Define columns for the DataFrame
columns = [
    'name', 'gtin', 'isDelete', 'date', 'pl1', 'pl2', 'pl3', 
    'description', 'allergyAdvice', 'recycling', 'recycling_other', 
    'brands', 'marketing', 'ingredients', 'features', 'storage', 'preserves'
]

# Step 1: Retrieve all XML IDs in the database and store them in a set
def get_all_xml_ids():
    try:
        conn = psycopg2.connect(**db_config)
        cursor = conn.cursor()
        
        # Fetch all XML IDs from the database
        cursor.execute("SELECT xml_id FROM brandbankxml;")
        xml_ids = cursor.fetchall()
        
        # Convert to a set for fast lookup
        xml_ids_set = {xml_id[0] for xml_id in xml_ids}
        
        cursor.close()
        conn.close()
        print(f"Total XML IDs retrieved from the database: {len(xml_ids_set)}")
        return xml_ids_set
    except Exception as e:
        print(f"Error connecting to PostgreSQL: {e}")
        return set()

# Function to process each XML file using a separate SSH connection
def process_file(filename, filepath):
    try:
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(hostname, username=username, key_filename=pem_file_path)
        sftp = ssh.open_sftp()

        with sftp.file(filepath, 'r') as file:
            xml_content = file.read().decode('utf-8')
            product_data = extract_from_xml_string(xml_content, filename)

        sftp.close()
        ssh.close()
        return product_data
    except Exception as e:
        print(f"Failed to process {filename}: {e}")
        return None

# Step 2: Collect a list of XML files to process
def find_all_xml_files(sftp, path, existing_xml_ids):
    xml_files = []
    folders = [path]

    with tqdm(total=len(folders), desc="Processing Folders") as folder_bar:
        while folders:
            current_folder = folders.pop(0)
            try:
                for item in sftp.listdir_attr(current_folder):
                    item_path = f"{current_folder}/{item.filename}"
                    if item.st_mode & 0o40000:
                        folders.append(item_path)
                    elif item.filename.endswith('.xml'):
                        if item.filename not in existing_xml_ids:  # Check if the XML ID is not in the set
                            xml_files.append(item_path)
            except PermissionError:
                print(f"Permission denied for folder: {current_folder}. Skipping...")
            except Exception as e:
                print(f"Error accessing {current_folder}: {e}")
            folder_bar.update(1)

    return xml_files

# Main function to collect and process new XML files
def collect_new_xml_data():
    existing_xml_ids = get_all_xml_ids()

    # Initial SSH connection to gather XML file paths
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(hostname, username=username, key_filename=pem_file_path)
    sftp = ssh.open_sftp()

    print("Searching for XML files in all subdirectories...")
    files_to_process = find_all_xml_files(sftp, remote_dir, existing_xml_ids)
    sftp.close()
    ssh.close()

    # Process files in parallel using multiple connections
    extracted_data_list = []
    print("Starting parallel processing of files...")
    with ThreadPoolExecutor(max_workers=8) as executor:
        future_to_file = {executor.submit(process_file, os.path.basename(file), file): file for file in files_to_process}

        with tqdm(total=len(files_to_process), desc="Processing XML Files") as pbar:
            for future in as_completed(future_to_file):
                result = future.result()
                if result:
                    extracted_data_list.append(result)
                pbar.update(1)
    
    # Create DataFrame from collected data
    out_df = pd.DataFrame(extracted_data_list)
    print("Data collection complete. Returning DataFrame.")
    return out_df

# Run the function to collect data and print the resulting DataFrame
out_df2 = collect_new_xml_data()
out_df2.to_csv('data/new_xml2.csv', index=False)
print(out_df2)


Total XML IDs retrieved from the database: 894837
Searching for XML files in all subdirectories...


Processing Folders: 313280it [8:10:24, 12.32it/s]                 

Permission denied for folder: /mnt/brandbank//lost+found. Skipping...


Processing Folders: 478949it [12:24:06,  7.12it/s]Socket exception: An existing connection was forcibly closed by the remote host (10054)
Processing Folders: 483797it [12:24:27, 380.29it/s]

Error accessing /mnt/brandbank//5060510763425: Server connection dropped: 
Error accessing /mnt/brandbank//3307210254122: Socket is closed
Error accessing /mnt/brandbank//5000225055002: Socket is closed
Error accessing /mnt/brandbank//0000001291810: Socket is closed
Error accessing /mnt/brandbank//5051413597347: Socket is closed
Error accessing /mnt/brandbank//5000328139173: Socket is closed
Error accessing /mnt/brandbank//8005110630415: Socket is closed
Error accessing /mnt/brandbank//5056236010356: Socket is closed
Error accessing /mnt/brandbank//0000000439169: Socket is closed
Error accessing /mnt/brandbank//5050854518386: Socket is closed
Error accessing /mnt/brandbank//0256770000003: Socket is closed
Error accessing /mnt/brandbank//5063249930984: Socket is closed
Error accessing /mnt/brandbank//0000000508216: Socket is closed
Error accessing /mnt/brandbank//5052715097252: Socket is closed
Error accessing /mnt/brandbank//5000169032992: Socket is closed
Error accessing /mnt/brandban

Processing Folders: 493566it [12:24:28, 2248.13it/s]

Error accessing /mnt/brandbank//5050565168269: Socket is closed
Error accessing /mnt/brandbank//5060139660662: Socket is closed
Error accessing /mnt/brandbank//5000169081556: Socket is closed
Error accessing /mnt/brandbank//5060108908375: Socket is closed
Error accessing /mnt/brandbank//5054781732761: Socket is closed
Error accessing /mnt/brandbank//5051413097458: Socket is closed
Error accessing /mnt/brandbank//5000169304488: Socket is closed
Error accessing /mnt/brandbank//8001090073136: Socket is closed
Error accessing /mnt/brandbank//9003600481836: Socket is closed
Error accessing /mnt/brandbank//0600753810903: Socket is closed
Error accessing /mnt/brandbank//5392001006309: Socket is closed
Error accessing /mnt/brandbank//5022896821861: Socket is closed
Error accessing /mnt/brandbank//8720182865199: Socket is closed
Error accessing /mnt/brandbank//5060366570529: Socket is closed
Error accessing /mnt/brandbank//5057753931735: Socket is closed
Error accessing /mnt/brandbank//01284508

Processing Folders: 502280it [12:24:28, 11.24it/s]  


Error accessing /mnt/brandbank//5029053019635: Socket is closed
Error accessing /mnt/brandbank//5013967016521: Socket is closed
Error accessing /mnt/brandbank//5060477670002: Socket is closed
Error accessing /mnt/brandbank//0883314406672: Socket is closed
Error accessing /mnt/brandbank//5014198003977: Socket is closed
Error accessing /mnt/brandbank//5000169270035: Socket is closed
Error accessing /mnt/brandbank//5033729827417: Socket is closed
Error accessing /mnt/brandbank//5010251796565: Socket is closed
Error accessing /mnt/brandbank//5000169090350: Socket is closed
Error accessing /mnt/brandbank//5012547001797: Socket is closed
Error accessing /mnt/brandbank//5017375577070: Socket is closed
Error accessing /mnt/brandbank//3858886460602: Socket is closed
Error accessing /mnt/brandbank//5011047201003: Socket is closed
Error accessing /mnt/brandbank//5000168096551: Socket is closed
Error accessing /mnt/brandbank//03274545: Socket is closed
Error accessing /mnt/brandbank//5904224107420

Processing XML Files:   0%|          | 14/15090 [00:20<4:02:48,  1.03it/s]

Failed to process 10284404.xml: [WinError 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond
Failed to process 12684303.xml: [WinError 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond
Failed to process 9485957.xml: [WinError 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond
Failed to process 12500701.xml: [WinError 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond


Processing XML Files:  93%|█████████▎| 14066/15090 [35:42<01:52,  9.14it/s]

Failed to process 13806292.xml: unclosed token: line 198, column 8


Processing XML Files: 100%|██████████| 15090/15090 [38:14<00:00,  6.58it/s]


Data collection complete. Returning DataFrame.
                 0               1   2           3                   4   \
0       4663234.xml   5020379124584   1  2017-08-18                       
1      10635409.xml   5000299618394   1  2022-08-04                       
2      12659917.xml   5702017600208   0  2023-12-08  Household & Garden   
3      12461053.xml   5060427350343   0  2024-01-08  Skin and Hair Care   
4      10036086.xml   5020379166355   1  2021-12-22                       
...             ...             ...  ..         ...                 ...   
15080  10340743.xml  03026981373619   1  2022-05-10                       
15081   8431139.xml   5020379144605   1  2020-10-26                       
15082  10296462.xml   0000000462259   1  2022-03-21                       
15083  10404930.xml   5010525194233   1  2022-05-03                       
15084   5947214.xml   5020379139939   1  2018-05-25                       

                                5                   

In [3]:
out_df = out_df2

Comapare against highest xml_id

In [1]:
import paramiko
import psycopg2
import io
import os
from tqdm import tqdm
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from Scripts.XML_Extraction.Extract_XML import extract_from_xml_string  # Assuming this is your function

# PostgreSQL connection details
db_config = {
    'database': 'Products_Phrases',
    'user': 'Jack Simpson@impactscoredb',
    'password': 'hycMK>~4~3',  # Replace with actual password
    'host': 'impactscoredb.postgres.database.azure.com',
    'port': '5432'
}

# Server details
hostname = 'ec2-18-168-204-252.eu-west-2.compute.amazonaws.com'
username = 'deploy'
remote_dir = '/mnt/brandbank/'
pem_file_path = r'G:\$00-Work\ImpactScore\Connecting to EC2\impactscore.pem'

# Define columns for the DataFrame
columns = [
    'name', 'gtin', 'isDelete', 'date', 'pl1', 'pl2', 'pl3', 
    'description', 'allergyAdvice', 'recycling', 'recycling_other', 
    'brands', 'marketing', 'ingredients', 'features', 'storage', 'preserves'
]

# Step 1: Retrieve the highest XML ID in the database
def get_highest_xml_id():
    try:
        conn = psycopg2.connect(**db_config)
        cursor = conn.cursor()
        
        # Fetch the highest XML ID from the database
        cursor.execute("SELECT MAX(CAST(SUBSTRING(xml_id, 1, LENGTH(xml_id) - 4) AS INTEGER)) FROM brandbankxml;")
        max_xml_id = cursor.fetchone()[0] or 0
        
        cursor.close()
        conn.close()
        print(f"Highest XML ID in the database: {max_xml_id}")
        return max_xml_id
    except Exception as e:
        print(f"Error connecting to PostgreSQL: {e}")
        return 0

# Function to process each XML file using a separate SSH connection
def process_file(filename, filepath):
    try:
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(hostname, username=username, key_filename=pem_file_path)
        sftp = ssh.open_sftp()

        with sftp.file(filepath, 'r') as file:
            xml_content = file.read().decode('utf-8')
            product_data = extract_from_xml_string(xml_content, filename)

        sftp.close()
        ssh.close()
        return product_data
    except Exception as e:
        print(f"Failed to process {filename}: {e}")
        return None

# Step 2: Collect a list of XML files to process
def find_all_xml_files(sftp, path, max_xml_id):
    xml_files = []
    folders = [path]

    with tqdm(total=len(folders), desc="Processing Folders") as folder_bar:
        while folders:
            current_folder = folders.pop(0)
            try:
                for item in sftp.listdir_attr(current_folder):
                    item_path = f"{current_folder}/{item.filename}"
                    if item.st_mode & 0o40000:
                        folders.append(item_path)
                    elif item.filename.endswith('.xml'):
                        xml_id = int(item.filename[:-4])  # Extract numerical ID from filename
                        if xml_id > max_xml_id:  # Only include files with IDs greater than max_xml_id
                            xml_files.append(item_path)
            except PermissionError:
                print(f"Permission denied for folder: {current_folder}. Skipping...")
            except Exception as e:
                print(f"Error accessing {current_folder}: {e}")
            folder_bar.update(1)

    return xml_files

# Main function to collect and process new XML files
def collect_new_xml_data():
    max_xml_id_in_db = get_highest_xml_id()

    # Initial SSH connection to gather XML file paths
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(hostname, username=username, key_filename=pem_file_path)
    sftp = ssh.open_sftp()

    print("Searching for XML files in all subdirectories...")
    files_to_process = find_all_xml_files(sftp, remote_dir, max_xml_id_in_db)
    sftp.close()
    ssh.close()

    # Process files in parallel using multiple connections
    extracted_data_list = []
    print("Starting parallel processing of files...")
    with ThreadPoolExecutor(max_workers=8) as executor:
        future_to_file = {executor.submit(process_file, os.path.basename(file), file): file for file in files_to_process}

        with tqdm(total=len(files_to_process), desc="Processing XML Files") as pbar:
            for future in as_completed(future_to_file):
                result = future.result()
                if result:
                    extracted_data_list.append(result)
                pbar.update(1)
    
    # Create DataFrame from collected data
    out_df = pd.DataFrame(extracted_data_list)
    print("Data collection complete. Returning DataFrame.")
    return out_df

# Run the function to collect data and print the resulting DataFrame
out_df = collect_new_xml_data()
out_df.to_csv('data/new_xml.csv', index=False)
print(out_df)


Highest XML ID in the database: 12698297
Searching for XML files in all subdirectories...


Processing Folders: 313281it [7:53:50, 12.32it/s]                 

Permission denied for folder: /mnt/brandbank//lost+found. Skipping...


Processing Folders: 571692it [14:20:45, 11.07it/s]


Starting parallel processing of files...


Processing XML Files:  89%|████████▉ | 70969/79472 [2:54:51<15:21,  9.22it/s]  

Failed to process 13806292.xml: unclosed token: line 198, column 8


Processing XML Files: 100%|██████████| 79472/79472 [3:15:43<00:00,  6.77it/s]


Data collection complete. Returning DataFrame.
                 0              1   2           3                   4   \
0      13104033.xml  5050854280047   0  2024-08-01       Dairy & Bread   
1      12764831.xml       00389976   0  2024-01-29  Household & Garden   
2      13740753.xml  5059697755262   0  2024-09-17  Household & Garden   
3      13234327.xml  5057753923136   0  2024-07-08             Grocery   
4      13033043.xml  5050665050136   0  2024-05-20    Ready Made Foods   
...             ...            ...  ..         ...                 ...   
79466  12967228.xml  5010459000891   0  2024-04-12              Drinks   
79467  13028180.xml  5060206743861   0  2024-04-09  Fruit & Vegetables   
79468  12947527.xml  5011308000208   0  2024-04-08             Grocery   
79469  13420819.xml  5054781461234   0  2024-07-29             Grocery   
79470  12832749.xml  5063089158609   0  2024-03-07             Grocery   

                            5                          6   \
0  

In [4]:
backup = out_df

In [6]:
out_df=out_df.iloc[:, :-1]
out_df.columns = columns

In [7]:
out_df

Unnamed: 0,name,gtin,isDelete,date,pl1,pl2,pl3,description,allergyAdvice,recycling,recycling_other,brands,marketing,ingredients,features,storage,preserves
0,4663234.xml,5020379124584,1,2017-08-18,,,,,,,,,,,,,
1,10635409.xml,5000299618394,1,2022-08-04,,,,,,,,,,,,,
2,12659917.xml,5702017600208,0,2023-12-08,Household & Garden,For Kids,Toys,LEGO Creator 3in1 Hamster Wheel Animal Toy Set...,,,,Lego,,,,,
3,12461053.xml,5060427350343,0,2024-01-08,Skin and Hair Care,"Soaps, Deodorants & Bathing",Soap & Liquid Soaps,Kids Stuff Blue Foaming Soap 225ml,,,Recycle when empty. Do not crush,Kids Stuff,,"Aqua (Water), Butane, Isobutane, Propane, Trie...","Blue Foaming Soap, Fun Fruity Fragrance, Gentl...",,
4,10036086.xml,5020379166355,1,2021-12-22,,,,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
15080,10340743.xml,03026981373619,1,2022-05-10,,,,,,,,,,,,,
15081,8431139.xml,5020379144605,1,2020-10-26,,,,,,,,,,,,,
15082,10296462.xml,0000000462259,1,2022-03-21,,,,,,,,,,,,,
15083,10404930.xml,5010525194233,1,2022-05-03,,,,,,,,,,,,,


Extract and Process Only the Missing XML Files

In [8]:
# Check each cell in the selected columns to see if it contains a list
list_rows = out_df[[
    'description',
    'allergyAdvice',
    'recycling',
    'recycling_other',
    'brands',
    'marketing',
    'ingredients',
    'features',
    'storage',
    'preserves'
]].applymap(lambda x: isinstance(x, list)).any(axis=1)

# Display rows that contain lists
rows_with_lists = out_df[list_rows]
print(rows_with_lists)


               name            gtin  isDelete        date                 pl1  \
287    12670235.xml   4060800304353         0  2023-12-15              Drinks   
297    12670525.xml   5060963011906         0  2023-12-18    Ready Made Foods   
380    12660701.xml   0811670032174         0  2024-03-03       Dairy & Bread   
683    12617751.xml   5000101514265         0  2024-02-20  Skin and Hair Care   
847    12684373.xml   5000393171689         0  2023-12-28         Off Licence   
...             ...             ...       ...         ...                 ...   
14531  12696412.xml   7613036288644         0  2024-01-05                Pets   
14581  12677084.xml  03574661332505         0  2023-12-21  Skin and Hair Care   
14610  12677110.xml   5000171058539         0  2023-12-19             Grocery   
14814  12687005.xml   5060078180375         0  2024-01-03         Off Licence   
14983  12677108.xml   5000171058485         0  2023-12-19             Grocery   

                           

  ]].applymap(lambda x: isinstance(x, list)).any(axis=1)


In [9]:
# Replace empty lists with empty strings across all columns in the DataFrame
out_df = out_df.applymap(lambda x: '' if x == [] else x)

  out_df = out_df.applymap(lambda x: '' if x == [] else x)


In [10]:
# Apply the transformations
out_df = out_df.fillna('')
out_df['text'] = out_df[[
    'description',
    'allergyAdvice',
    'recycling',
    'recycling_other',
    'brands',
    'marketing',
    'ingredients',
    'features',
    'storage',
    'preserves']].agg(' '.join, axis=1)

out_df.loc[out_df['isDelete'] == 1, 'text'] = ''
out_df['text'] = out_df['text'].str.lower()
out_df['text'] = out_df['text'].str.replace('\n', '', regex=True)

print("Data transformation completed.")
out_df

Data transformation completed.


Unnamed: 0,name,gtin,isDelete,date,pl1,pl2,pl3,description,allergyAdvice,recycling,recycling_other,brands,marketing,ingredients,features,storage,preserves,text
0,4663234.xml,5020379124584,1,2017-08-18,,,,,,,,,,,,,,
1,10635409.xml,5000299618394,1,2022-08-04,,,,,,,,,,,,,,
2,12659917.xml,5702017600208,0,2023-12-08,Household & Garden,For Kids,Toys,LEGO Creator 3in1 Hamster Wheel Animal Toy Set...,,,,Lego,,,,,,lego creator 3in1 hamster wheel animal toy set...
3,12461053.xml,5060427350343,0,2024-01-08,Skin and Hair Care,"Soaps, Deodorants & Bathing",Soap & Liquid Soaps,Kids Stuff Blue Foaming Soap 225ml,,,Recycle when empty. Do not crush,Kids Stuff,,"Aqua (Water), Butane, Isobutane, Propane, Trie...","Blue Foaming Soap, Fun Fruity Fragrance, Gentl...",,,kids stuff blue foaming soap 225ml recycle w...
4,10036086.xml,5020379166355,1,2021-12-22,,,,,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
15080,10340743.xml,03026981373619,1,2022-05-10,,,,,,,,,,,,,,
15081,8431139.xml,5020379144605,1,2020-10-26,,,,,,,,,,,,,,
15082,10296462.xml,0000000462259,1,2022-03-21,,,,,,,,,,,,,,
15083,10404930.xml,5010525194233,1,2022-05-03,,,,,,,,,,,,,,


In [11]:
out_df["text_tsvector"] = ""

Insert Data into PostgreSQL

In [12]:
import psycopg2
from psycopg2.extras import execute_values

def insert_data_without(df):
    try:
        conn = psycopg2.connect(**db_config)
        cursor = conn.cursor()
        
        # Insert query without tsvector_data, to be calculated in PostgreSQL later
        insert_query = """
            INSERT INTO brandbankxml (xml_id, gtin, "isDelete", date, pl1, pl2, pl3, description, "allergyAdvice", 
                                      recycling, recycling_other, brands, marketing, ingredients, features, 
                                      storage, preserves, text, text_tsvector)
            VALUES %s
            ON CONFLICT (xml_id) DO NOTHING
        """
        
        # Prepare the data to be inserted
        data = df.to_records(index=False).tolist()

        # Use execute_values to perform a bulk insert
        execute_values(cursor, insert_query, data)
        
        conn.commit()
        cursor.close()
        conn.close()
        print(f"Inserted {len(data)} new records into the database.")
    except Exception as e:
        print(f"Error inserting data into PostgreSQL: {e}")


In [13]:
insert_data_without(out_df)

Inserted 15085 new records into the database.


In [15]:
def update_tsvector_column():
    try:
        conn = psycopg2.connect(**db_config)
        cursor = conn.cursor()
        
        # SQL command to populate tsvector_data column based on the text column
        update_query = """
            UPDATE brandbankxml
            SET text_tsvector = to_tsvector('english', text)
            WHERE text_tsvector IS NULL OR text_tsvector = '';
        """
        
        cursor.execute(update_query)
        conn.commit()
        cursor.close()
        conn.close()
        print("tsvector_data column updated successfully.")
    except Exception as e:
        print(f"Error updating text_tsvector column: {e}")


In [16]:
# Then, update the tsvector column
update_tsvector_column()

tsvector_data column updated successfully.


In [17]:
max_xml_id_in_db = get_highest_xml_id()
max_xml_id_in_db

NameError: name 'get_highest_xml_id' is not defined

In [37]:
out_df

Unnamed: 0,name,gtin,isDelete,date,pl1,pl2,pl3,description,allergyAdvice,recycling,recycling_other,brands,marketing,ingredients,features,storage,preserves,text,text_tsvector
0,13104033.xml,5050854280047,0,2024-08-01,Dairy & Bread,Milk & Cream,Fresh Cream,Asda Fresh Double Cream 250ml,"Milk, Contains","Film, Do Not Recycle",Clean - Pot & Lid - Recycle,Asda,"Rich and thick, pour over treacle sponge or mi...",Double Cream (Milk),,"Do Not Freeze, Keep Refrigerated",,"asda fresh double cream 250ml milk, contains f...",
1,12764831.xml,00389976,0,2024-01-29,Household & Garden,Plants,Cut Flowers,Beautiful Smile,,,,Unbranded,,,,,,beautiful smile unbranded,
2,13740753.xml,5059697755262,0,2024-09-17,Household & Garden,Utilities,Other,Tesco F&I 400TC CTN Tencel Flexi Fitted Sheet ...,,,,Tesco,,,,,,tesco f&i 400tc ctn tencel flexi fitted sheet ...,
3,13234327.xml,5057753923136,0,2024-07-08,Grocery,Sauces & Condiments,Other Sauces & Condiments,Tesco Fruity Brown Sauce 445g,,"Bottle, Widely Recycled",,Tesco,,"Tomato Purée, Spirit Vinegar, Sugar, Water, Ap...","Sweet & tangy, Subtly spiced for a rounded fru...",,,"tesco fruity brown sauce 445g bottle, widely ...",
4,13033043.xml,5050665050136,0,2024-05-20,Ready Made Foods,Meals,Other,Mug Shot Max Roast Chicken Flavour Pasta 68g,"Barley, Contains, Cereals Containing Gluten, M...","Lid, Do Not Recycle, Pot, Recycle, Sleeve, Rec...",,Mug Shot,Have a mug shot mmmoment!\nRamp up your lunch ...,"Dried Pasta [Durum Wheat Semolina], Natural Fl...","Flavour to the Max, Ready in 5 Minutes, Made w...",,,mug shot max roast chicken flavour pasta 68g b...,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
79466,12967228.xml,5010459000891,0,2024-04-12,Drinks,Water,Still Waters,Highland Spring Strawberry Still Spring Water ...,,,"Recycle me, This bottle is made from 100% recy...",Highland Spring,A fruity burst of natural strawberry flavour m...,"Still Spring Water, Acid (Citric Acid), Natura...",Sugar Free,,,highland spring strawberry still spring water ...,
79467,13028180.xml,5060206743861,0,2024-04-09,Fruit & Vegetables,Core Vegetables,Carrots,Gilfresh Family Grown Carrots 1kg,,"Film, Plastic - Widely Recycled, Pack, Recycle...",,Gilfresh,,Carrots,Sweet & Crunchy,Keep Refrigerated,,"gilfresh family grown carrots 1kg film, plast...",
79468,12947527.xml,5011308000208,0,2024-04-08,Grocery,Speciality/Ethnic Foods,Indian Cooking Sauces,Patak's Hot Lime Pickle 283g,"Mustard, Contains, Nuts, May Contain, Peanuts,...",,Rinse / Lid on - Recycle,Patak's,,"Limes (64%), Rapeseed Oil, Salt, Ground Spices...","Spicy & Zesty, Chilli rating - Hot - 3, No Art...",,,"patak's hot lime pickle 283g mustard, contains...",
79469,13420819.xml,5054781461234,0,2024-07-29,Grocery,Sauces & Condiments,Dips,Asda Sour Cream & Chive 290g,"Eggs, Contains, Milk, Contains",,Clean - Jar & Lid - Recycle,Asda,,"Water, Rapeseed Oil, Soured Cream (Milk) (7%),...","Creamy and mild, perfect for dunking and dippi...",,,"asda sour cream & chive 290g eggs, contains, m...",
