In [1]:
import json
import csv
from datetime import datetime

def convert_json_to_csv(input_file, output_file):
    # Read JSON data
    with open(input_file, 'r', encoding='utf-8') as f:
        data = json.load(f)

    # Prepare CSV data
    csv_data = []
    id_counter = 1
    for item in data:
        if isinstance(item, dict) and item.get('header') == 'YouTube':
            title = item.get('title', '')
            link = item.get('titleUrl', '')
            time = item.get('time', '')
            description = item.get('description', '')
            
            # Convert time to date and minute
            try:
                dt = datetime.fromisoformat(time.replace('Z', '+00:00'))
                formatted_time = dt.strftime('%Y-%m-%d %H:%M')
            except ValueError:
                formatted_time = ''  # If date parsing fails, use an empty string
            
            # Determine activity type
            details = item.get('details', [])
            if any(detail.get('name') == 'From Google Ads' for detail in details):
                activity_type = 'advertisement'
            elif "Searched for" in title:
                activity_type = 'search'
                title = title.replace("Searched for ", "")
            elif "#shorts" in title.lower():
                activity_type = 'shorts'
                title = title[8:] if title.startswith('Watched ') else title  # Remove 'Watched ' prefix if present
            elif title.startswith('Watched '):
                activity_type = 'watch'
                title = title[8:]  # Remove 'Watched ' prefix
            else:
                activity_type = 'other'
            
            csv_data.append([id_counter, activity_type, title, link, formatted_time, description])
            id_counter += 1

    # Write CSV file
    with open(output_file, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(['id', 'activity_type', 'title', 'link', 'dt', 'description'])  # Write header
        writer.writerows(csv_data)

    print(f"CSV file '{output_file}' has been created successfully.")

# Usage
input_file = 'watch-history-oli.json'
output_file = 'youtube_watch_history_oli.csv'
convert_json_to_csv(input_file, output_file)

CSV file 'youtube_watch_history_oli.csv' has been created successfully.


### Search history

In [2]:
import json
import csv
from datetime import datetime

def convert_json_to_csv(input_file, output_file):
    # Read JSON data
    with open(input_file, 'r', encoding='utf-8') as f:
        data = json.load(f)

    # Prepare CSV data
    csv_data = []
    id_counter = 1
    for item in data:
        if isinstance(item, dict) and item.get('header') == 'YouTube':
            # Check if it's from Google Ads
            details = item.get('details', [])
            if any(detail.get('name') == 'From Google Ads' for detail in details):
                continue  # Skip this entry if it's from Google Ads

            title = item.get('title', '')
            link = item.get('titleUrl', '')
            time = item.get('time', '')
            description = item.get('description', '')
            
            try:
                dt = datetime.fromisoformat(time.replace('Z', '+00:00'))
                formatted_datetime = dt.strftime('%Y-%m-%d %H:%M')
            except ValueError:
                dt = ''  # If date parsing fails, use an empty string
            
            if "Searched for" in title:
                activity_type = 'search'
                title = title.replace("Searched for ", "")
            elif title.startswith('Watched '):
                activity_type = 'watch'
                title = title[8:]  # Remove 'Watched ' prefix
            else:
                activity_type = 'other'
            
            csv_data.append([id_counter, activity_type, title, link, dt, description])
            id_counter += 1

    # Write CSV file
    with open(output_file, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(['id', 'activity_type', 'title', 'link', 'dt', 'description'])  # Write header
        writer.writerows(csv_data)

    print(f"CSV file '{output_file}' has been created successfully.")

# Usage
input_file = 'search-history-oli.json'
output_file = 'youtube_search_history_oli.csv'
convert_json_to_csv(input_file, output_file)

CSV file 'youtube_search_history_oli.csv' has been created successfully.


### Combine

In [3]:
import csv
from datetime import datetime, timedelta
from dateutil import parser
import pytz

def parse_datetime(dt_string):
    try:
        dt = parser.parse(dt_string)
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=pytz.UTC)
        return dt
    except ValueError:
        return None

def combine_and_classify_csv_files(watch_file, search_file, output_file):
    combined_data = []

    # Read watch history
    with open(watch_file, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            row['source'] = 'watch'
            combined_data.append(row)

    # Read search history
    with open(search_file, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            row['source'] = 'search'
            combined_data.append(row)

    # Sort the combined data chronologically (oldest first)
    combined_data.sort(key=lambda x: parse_datetime(x['dt']) or datetime.min.replace(tzinfo=pytz.UTC))

    # Classify post-search-watch activities
    last_search_time = None
    for i in range(len(combined_data)):
        current_time = parse_datetime(combined_data[i]['dt'])
        if current_time is None:
            continue

        if combined_data[i]['source'] == 'search':
            last_search_time = current_time
        elif combined_data[i]['source'] == 'watch' and combined_data[i]['activity_type'] == 'watch' and last_search_time is not None:
            if current_time > last_search_time and (current_time - last_search_time) <= timedelta(minutes=5):
                combined_data[i]['activity_type'] = 'post-search-watch'
                last_search_time = None  # Reset last_search_time after classifying
            else:
                last_search_time = None  # Reset if the watch is not within 5 minutes

    # Write the sorted, combined data to a new CSV file
    fieldnames = ['id', 'activity_type', 'title', 'link', 'dt', 'description', 'source']
    with open(output_file, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()

        # Reset the id to ensure it's sequential in the combined file
        for i, row in enumerate(combined_data, 1):
            row['id'] = i
            writer.writerow(row)

    print(f"Combined CSV file '{output_file}' has been created successfully.")

# Usage
watch_file = 'youtube_watch_history_oli.csv'
search_file = 'youtube_search_history_oli.csv'
output_file = 'youtube_combined_history_oli.csv'
combine_and_classify_csv_files(watch_file, search_file, output_file)

Combined CSV file 'youtube_combined_history_oli.csv' has been created successfully.
