In [35]:
!pip install XlsxWriter



In [62]:
# === Imports ===
import json
import pandas as pd
import re
import numpy as np
import copy
import glob
from datetime import datetime

# === Utility Functions ===

def safe_get(data, path, default=""):
    """Safely get a nested value from a dictionary using a list of keys."""
    for key in path:
        if isinstance(data, dict) and key in data:
            data = data[key]
        else:
            return default
    return default if data is None else data

def extract_file_segment(item):
    try:
        images = item.get("content", {}).get("imageComponent", {}).get("images", [])
        for img in images:
            for attr in img.get("attributes", []):
                vector = attr.get("detailData", {}).get("vectorImage", {})
                for artifact in vector.get("artifacts", []):
                    if "fileIdentifyingUrlPathSegment" in artifact:
                        return artifact["fileIdentifyingUrlPathSegment"]
    except Exception:
        pass
    return ""

def extract_epoch_from_segment(segment):
    try:
        parts = segment.split("/")
        if len(parts) >= 4:
            timestamp_str = parts[3].split("?")[0]
            if timestamp_str.isdigit():
                return int(timestamp_str)
    except Exception:
        pass
    return None

def extract_entity_id(urn_str):
    match = re.search(r'\d{6,}', urn_str or "")
    return int(match.group()) if match else None

In [64]:
# === Load HAR file ===
"""filename = "/content/drive/MyDrive/LinkedInData/www.linkedin.com.har"

with open(filename, 'r', encoding='utf-8') as f:
    har_data = json.load(f) """


# === Load ALL HAR files ===
har_folder = "/content/drive/MyDrive/LinkedInData"
har_files = glob.glob(f"{har_folder}/*.har")

print(f"✅ Found {len(har_files)} HAR files")

# Now load them one by one
all_entries = []

for filename in har_files:
    print(f"Loading: {filename}")
    with open(filename, 'r', encoding='utf-8') as f:
        har_data = json.load(f)
        # Extract entries from this file and append
        entries = har_data.get("log", {}).get("entries", [])
        all_entries.extend(entries)

# Now all_entries contains all entries from all HAR files
print(f"✅ Total entries loaded: {len(all_entries)}")

# OPTIONAL: If your extract_posts_and_social() expects har_data,
# you can fake one like this:
combined_har_data = {"log": {"entries": all_entries}}



✅ Found 10 HAR files
Loading: /content/drive/MyDrive/LinkedInData/www.linkedin.com10.har
Loading: /content/drive/MyDrive/LinkedInData/www.linkedin.com8.har
Loading: /content/drive/MyDrive/LinkedInData/www.linkedin.com9.har
Loading: /content/drive/MyDrive/LinkedInData/www.linkedin.com6.har
Loading: /content/drive/MyDrive/LinkedInData/www.linkedin.com7.har
Loading: /content/drive/MyDrive/LinkedInData/www.linkedin.com4.har
Loading: /content/drive/MyDrive/LinkedInData/www.linkedin.com5.har
Loading: /content/drive/MyDrive/LinkedInData/www.linkedin.com1.har
Loading: /content/drive/MyDrive/LinkedInData/www.linkedin.com2.har
Loading: /content/drive/MyDrive/LinkedInData/www.linkedin.com3.har
✅ Total entries loaded: 25756


In [65]:
# === Extract posts & social data ===

def extract_posts_and_social(har_data):
    posts = []
    social_data = []

    for entry in har_data.get("log", {}).get("entries", []):
        url = entry.get("request", {}).get("url", "")
        if not url.startswith("https://www.linkedin.com/voyager/api/graphql?"):
            continue

        text_data = entry.get("response", {}).get("content", {}).get("text", "")
        try:
            response_json = json.loads(text_data)
        except json.JSONDecodeError:
            continue

        for item in response_json.get("included", []):
            # === Posts ===
            if item.get("$type") == "com.linkedin.voyager.dash.feed.Update":
                attributes = safe_get(item, ["actor", "name", "attributesV2"], [])
                actor_profile = safe_get(attributes[0], ["detailData", "*profileFullName"]) if attributes else ""
                actor_profile_id = actor_profile.split("profile:")[1] if "profile:" in actor_profile else ""

                file_segment = extract_file_segment(item)
                image_epoch = extract_epoch_from_segment(file_segment)
                image_datetime = datetime.fromtimestamp(image_epoch / 1000).strftime('%Y-%m-%d %H:%M:%S') if image_epoch else ""

                entity_urn = safe_get(item, ["metadata", "backendUrn"])
                header = safe_get(item, ["header", "text", "text"])

                header_attributes = safe_get(item, ["header", "text", "attributesV2"], [])
                header_profile = safe_get(header_attributes[0], ["detailData", "*profileFullName"]) if header_attributes else ""
                header_profile_id = header_profile.split("profile:")[1] if "profile:" in header_profile else ""

                commentary_attributes = safe_get(item, ["commentary", "text", "attributesV2"], [])
                company_id = safe_get(commentary_attributes[0], ["detailData", "*companyName"]) if commentary_attributes else None

                socialDetail = safe_get(item, ["*socialDetail"])
                resharedUpdate = safe_get(item, ["*resharedUpdate"])

                resharedUpdate_id = extract_entity_id(resharedUpdate)
                entity_id = extract_entity_id(entity_urn)

                matches = re.findall(r'urn:li:(?:activity|ugcPost):(\d+)\b|urn:li:groupPost:\d+-(\d+)\b', socialDetail)
                flattened_matches = [m[0] or m[1] for m in matches if m[0] or m[1]]
                sd_left = flattened_matches[0] if len(matches) >= 2 else None
                sd_right = flattened_matches[1] if len(matches) >= 2 else None

                post_data = {
                    "entity_id": str(entity_id) if entity_id else "",
                    "resharedUpdate_id": str(resharedUpdate_id) if resharedUpdate_id else "",
                    "header": header,
                    "post_text": safe_get(item, ["commentary", "text", "text"]),
                    "actor_description": safe_get(item, ["actor", "description", "text"]),
                    "actor_name": safe_get(item, ["actor", "name", "text"]),
                    "actor_profile": actor_profile_id,
                    "actor_backendUrn": safe_get(item, ["actor", "backendUrn"]),
                    "share_url": safe_get(item, ["socialContent", "shareUrl"]),
                    "file_segment": file_segment,
                    "image_epoch": image_epoch,
                    "image_datetime": image_datetime,
                    "socialDetail": socialDetail,
                    "sd_left": sd_left,
                    "sd_right": sd_right,
                    "header_profile_id": header_profile_id,
                    "company_id": company_id
                }

                posts.append(post_data)

            # === Social Data ===
            elif item.get("$type") == "com.linkedin.voyager.dash.feed.SocialActivityCounts":
                entity_urn = safe_get(item, ["entityUrn"])
                entity_id = extract_entity_id(entity_urn)

                numReactions = safe_get(item, ["numLikes"])
                numComments = safe_get(item, ["numComments"])
                numShares = safe_get(item, ["numShares"])

                reactionTypeCounts = safe_get(item, ["reactionTypeCounts"], [])
                reaction_map = {r.get("reactionType"): r.get("count", 0) for r in reactionTypeCounts}

                sd = {
                    "entity_id": entity_id,
                    "numReactions": numReactions,
                    "numLikes": reaction_map.get("LIKE", 0),
                    "numInterests": reaction_map.get("INTEREST", 0),
                    "numAppreciates": reaction_map.get("APPRECIATION", 0),
                    "numEntertains": reaction_map.get("ENTERTAINMENT", 0),
                    "numEmpathys": reaction_map.get("EMPATHY", 0),
                    "numPraises": reaction_map.get("PRAISE", 0),
                    "numComments": numComments,
                    "numShares": numShares
                }

                social_data.append(sd)

    return posts, social_data

"""
# Run extraction
posts, social_data = extract_posts_and_social(har_data)
print(f"✅ Extracted {len(posts)} posts and {len(social_data)} social records")"""

# Now run as normal:
posts, social_data = extract_posts_and_social(combined_har_data)
print(f"✅ Extracted {len(posts)} posts and {len(social_data)} social records from ALL files")

✅ Extracted 3831 posts and 3817 social records from ALL files


In [66]:
# === Deduplicate posts ===

def deduplicate_posts(posts):
    best_posts = {}

    for post in posts:
        post_id = str(post.get("entity_id", ""))
        if not post_id:
            continue

        if post_id not in best_posts:
            best_posts[post_id] = copy.deepcopy(post)
        else:
            current_best = best_posts[post_id]
            for key, value in post.items():
                if key not in current_best:
                    current_best[key] = value
                elif current_best[key] in [None, "", [], {}] and value not in [None, "", [], {}]:
                    current_best[key] = value

    return list(best_posts.values())

# Run deduplication
posts = deduplicate_posts(posts)
print(f"✅ Deduplicated to {len(posts)} unique posts")

✅ Deduplicated to 888 unique posts


In [67]:
# === Build actor lookup ===

def build_actor_lookup(posts):
    actor_lookup = {}

    for post in posts:
        for key in [post.get("actor_profile"), post.get("actor_backendUrn")]:
            if key:
                actor_lookup[key] = {
                    "actor_description": post.get("actor_description", ""),
                    "actor_name": post.get("actor_name", ""),
                    "actor_profile": post.get("actor_profile", ""),
                    "actor_backendUrn": post.get("actor_backendUrn", "")
                }
    return actor_lookup

# Build lookup
actor_lookup = build_actor_lookup(posts)
print(f"✅ Built actor lookup with {len(actor_lookup)} actors")

# === Enrich posts with social data ===

# Build social lookup
social_lookup = {str(sd["entity_id"]): sd for sd in social_data}

# Existing entity ids
existing_entity_ids = {post["entity_id"] for post in posts if "entity_id" in post}

# Helper: Create reshared post
def create_reshared_post(post):
    return {
        "entity_id": post["resharedUpdate_id"],
        "resharedUpdate_id": "",
        "post_text": post["post_text"],
        "actor_description": post["actor_description"],
        "actor_name": post["actor_name"],
        "actor_profile": post["actor_profile"],
        "actor_backendUrn": post["actor_backendUrn"],
        "share_url": post["share_url"],
        "image_epoch": post["image_epoch"],
        "image_epoch_interpolated": np.nan,
        "interpolated_time": "",
        "numReactions": post.get("numReactions", ""),
        "numLikes": post.get("numLikes", ""),
        "numInterests": post.get("numInterests", ""),
        "numAppreciates": post.get("numAppreciates", ""),
        "numEntertains": post.get("numEntertains", ""),
        "numEmpathys": post.get("numEmpathys", ""),
        "numPraises": post.get("numPraises", ""),
        "numComments": post.get("numComments", ""),
        "numShares": post.get("numShares", ""),
        "type": "Original",
        "sd_left": "",
        "sd_right": ""
    }

# Enrich and tag posts
for post in posts:
    post_id = post.get("entity_id", "")
    social_record = social_lookup.get(post_id)

    if social_record:
        post.update({k: v for k, v in social_record.items() if k != "entity_id"})

    # Handle reshared logic
    if post["sd_left"] != post["sd_right"] and post.get("type") is None:
        if post["sd_left"] not in existing_entity_ids:
            post["resharedUpdate_id"] = post["sd_left"]
            reshared_post = create_reshared_post(post)
            existing_entity_ids.add(reshared_post["entity_id"])
            posts.append(reshared_post)

        # Update current post as Share No Comment
        ref_id = post.get("header_profile_id") or post.get("company_id")
        actor = actor_lookup.get(ref_id, {})

        post.update({
            "post_text": "",
            "actor_description": actor.get("actor_description", ""),
            "actor_profile": actor.get("actor_profile", ""),
            "actor_backendUrn": actor.get("actor_backendUrn", ""),
            "actor_name": actor.get("actor_name", ""),
            "share_url": "",
            "image_epoch": "",
            "image_epoch_interpolated": np.nan,
            "interpolated_time": "",
            "numReactions": "",
            "numLikes": "",
            "numInterests": "",
            "numAppreciates": "",
            "numEntertains": "",
            "numEmpathys": "",
            "numPraises": "",
            "numComments": "",
            "numShares": "",
            "type": "Share No Comment"
        })

    elif post["resharedUpdate_id"] and not post["header"] and post.get("type") is None:
        post["type"] = "Share With Comment"

    elif (not post["resharedUpdate_id"]) and post.get("type") is None:
        post["type"] = "Original"

    # Clean up unused fields
    for field in ["socialDetail", "sd_left", "sd_right", "header_profile_id", "company_id", "file_segment", "header"]:
        if field in post:
            del post[field]

print("✅ Posts enriched and tagged")

✅ Built actor lookup with 82 actors
✅ Posts enriched and tagged


In [68]:
# === Interpolate image_epoch ===

def interpolate_epochs(df):
    known_points = []
    early_records = []
    slope = None
    first_known_entity_id = None
    first_known_epoch = None

    df["image_epoch"] = pd.to_numeric(df["image_epoch"], errors="coerce")

    for idx, row in df.iterrows():

        entity_id_numeric = int(row["entity_id"])
        current_epoch = row["image_epoch"]

        if pd.notna(current_epoch):
            known_points.append((entity_id_numeric, float(current_epoch)))
            df.at[idx, "image_epoch_interpolated"] = current_epoch

            if len(known_points) >= 2:
                eid_before, epoch_before = known_points[-2]
                eid_after, epoch_after = known_points[-1]
                slope = (epoch_after - epoch_before) / (eid_after - eid_before)

                if first_known_entity_id is None:
                    first_known_entity_id = eid_before
                    first_known_epoch = epoch_before

                for early_idx, early_eid in early_records:
                    extrapolated_epoch = first_known_epoch - slope * (first_known_entity_id - early_eid)
                    df.at[early_idx, "image_epoch_interpolated"] = int(round(extrapolated_epoch))
                    df.at[early_idx, "interpolated_time"] = datetime.fromtimestamp(df.at[early_idx, "image_epoch_interpolated"] / 1000).strftime('%Y-%m-%d %H:%M:%S')

                early_records.clear()

        else:
            if len(known_points) >= 2:
                before = [pt for pt in known_points if pt[0] < entity_id_numeric]
                after = [pt for pt in known_points if pt[0] > entity_id_numeric]

                if before and after:
                    eid_before, epoch_before = max(before, key=lambda x: x[0])
                    eid_after, epoch_after = min(after, key=lambda x: x[0])
                    local_slope = (epoch_after - epoch_before) / (eid_after - eid_before)
                    interpolated_epoch = epoch_before + local_slope * (entity_id_numeric - eid_before)

                elif before:
                    if len(before) >= 2:
                        b1, b2 = before[-2:]
                        local_slope = (b2[1] - b1[1]) / (b2[0] - b1[0])
                    else:
                        local_slope = slope or 0
                    eid_before, epoch_before = max(before, key=lambda x: x[0])
                    interpolated_epoch = epoch_before + local_slope * (entity_id_numeric - eid_before)

                elif after:
                    if len(after) >= 2:
                        a1, a2 = after[:2]
                        local_slope = (a2[1] - a1[1]) / (a2[0] - a1[0])
                    else:
                        local_slope = slope or 0
                    eid_after, epoch_after = min(after, key=lambda x: x[0])
                    interpolated_epoch = epoch_after - local_slope * (eid_after - entity_id_numeric)

                else:
                    interpolated_epoch = 0

                df.at[idx, "image_epoch_interpolated"] = int(round(interpolated_epoch))

            else:
                early_records.append((idx, entity_id_numeric))
                df.at[idx, "image_epoch_interpolated"] = np.nan

        if pd.notna(df.at[idx, "image_epoch_interpolated"]):
            df.at[idx, "interpolated_time"] = datetime.fromtimestamp(df.at[idx, "image_epoch_interpolated"] / 1000).strftime('%Y-%m-%d %H:%M:%S')

    return df

# Prepare dataframe
df = pd.DataFrame(posts)
df = df.sort_values(by="entity_id", ascending=False)

# Run interpolation
df = interpolate_epochs(df)
print("✅ Interpolation complete")

✅ Interpolation complete


In [69]:
# === Output CSV ===

df["entity_id"] = df["entity_id"].astype(str)
df["resharedUpdate_id"] = df["resharedUpdate_id"].astype(str)

"""CSV_COLUMNS = [
    "entity_id",
    "resharedUpdate_id",
    "post_text",
    "actor_name",
    "actor_description",
    "actor_profile",
    "actor_backendUrn",
    "share_url",
    "numReactions",
    "numLikes",
    "numInterests",
    "numAppreciates",
    "numEntertains",
    "numEmpathys",
    "numPraises",
    "numComments",
    "numShares",
    "image_epoch",
    "image_epoch_interpolated",
    "interpolated_time",
    "type"
]

output_file = "/content/drive/MyDrive/LinkedInData/linkedin_posts.csv"
df.to_csv(output_file, index=False, encoding="utf-8", columns=CSV_COLUMNS)
"""
print(f"✅ CSV saved as '{output_file}'")

✅ CSV saved as '/content/drive/MyDrive/LinkedInData/linkedin_posts.csv'


In [None]:
# Load the data
#file_path = "/content/drive/MyDrive/LinkedInData/linkedin_posts.csv"
#df = pd.read_csv(file_path)

# Parse datetime
df['image_datetime'] = pd.to_datetime(df['interpolated_time'], errors='coerce')

# Drop rows with missing datetime
df = df.dropna(subset=['image_datetime'])

# === FILTER FOR SELECTED ACTOR AND POST TYPES ===
filtered_df = df.loc[
    ((df["type"] == "Original") | (df["type"] == "Share With Comment") | (df["type"] == "Share No Comment")) &
    (df["actor_profile"] == "ACoAAAAoFkIBu5s3uIdB-WTos39dsfSNq-NNQIY")
]

# === CORE METRICS ===
filtered_df['date'] = filtered_df['image_datetime'].dt.date
filtered_df['week'] = filtered_df['image_datetime'].dt.isocalendar().week
filtered_df['year'] = filtered_df['image_datetime'].dt.year
filtered_df['word_count'] = filtered_df['post_text'].fillna("").apply(lambda x: len(str(x).split()))

# === ADDITIONAL METRICS ===
filtered_df['day_of_week'] = filtered_df['image_datetime'].dt.day_name()
filtered_df['char_count'] = filtered_df['post_text'].fillna("").apply(len)
filtered_df['emoji_count'] = filtered_df['post_text'].fillna("").apply(lambda x: len(re.findall(r'[^\w\s,]', x)))
filtered_df['hashtag_count'] = filtered_df['post_text'].fillna("").apply(lambda x: len(re.findall(r'#\w+', x)))

# Calculate time gaps in hours
filtered_df = filtered_df.sort_values('image_datetime')
filtered_df['post_gap_seconds'] = filtered_df['image_datetime'].diff().dt.total_seconds().fillna(0)
filtered_df['post_gap_minutes'] = filtered_df['image_datetime'].diff().dt.total_seconds().div(60).fillna(0)
filtered_df['post_gap_hours'] = filtered_df['image_datetime'].diff().dt.total_seconds().div(3600).fillna(0)



# === DAILY STATS ===
posts_pivot = filtered_df.pivot_table(
    index='date',
    columns='type',
    values='entity_id',
    aggfunc='count',
    fill_value=0
).reset_index()

words_pivot = filtered_df.pivot_table(
    index='date',
    columns='type',
    values='word_count',
    aggfunc='mean',
    fill_value=0
).reset_index()

daily_stats = posts_pivot.merge(words_pivot, on='date', suffixes=(' Count', ' Avg Words'))

# === WEEKLY STATS ===
"""weekly_stats = filtered_df.groupby(['year', 'week']).agg(
    posts_per_week=('post_text', 'count'),
    avg_words_per_post=('word_count', 'mean')
).reset_index() """

# === WEEKLY STATS ===
weekly_posts_pivot = filtered_df.pivot_table(
    index=['year', 'week'],
    columns='type',
    values='entity_id',
    aggfunc='count',
    fill_value=0
).reset_index()

weekly_words_pivot = filtered_df.pivot_table(
    index=['year', 'week'],
    columns='type',
    values='word_count',
    aggfunc='mean',
    fill_value=0
).reset_index()

weekly_stats = weekly_posts_pivot.merge(weekly_words_pivot, on=['year', 'week'], suffixes=(' Count', ' Avg Words'))

# === DAY OF WEEK STATS ===
"""day_of_week_stats = filtered_df.groupby('day_of_week').agg(
    total_posts=('post_text', 'count'),
    avg_word_count=('word_count', 'mean'),
    avg_char_count=('char_count', 'mean'),
    avg_emoji_count=('emoji_count', 'mean'),
    avg_hashtag_count=('hashtag_count', 'mean')
).reset_index() """

# === DAY OF WEEK STATS ===
day_of_week_posts_pivot = filtered_df.pivot_table(
    index='day_of_week',
    columns='type',
    values='entity_id',
    aggfunc='count',
    fill_value=0
).reset_index()

day_of_week_words_pivot = filtered_df.pivot_table(
    index='day_of_week',
    columns='type',
    values='word_count',
    aggfunc='mean',
    fill_value=0
).reset_index()

day_of_week_stats = day_of_week_posts_pivot.merge(day_of_week_words_pivot, on='day_of_week', suffixes=(' Count', ' Avg Words'))

# === OVERALL METRICS ===
"""
overall_stats = {
    "total_posts": len(filtered_df),
    "unique_post_days": filtered_df['date'].nunique(),
    "average_posts_per_day": filtered_df.groupby('date').size().mean(),
    "average_word_count": filtered_df['word_count'].mean(),
    "max_word_count": filtered_df['word_count'].max(),
    "min_word_count": filtered_df['word_count'].min(),
    "most_active_day": filtered_df['date'].value_counts().idxmax(),
    "most_active_day_count": filtered_df['date'].value_counts().max()
} """
# === OVERALL METRICS SPLIT BY TYPE ===
overall_by_type = filtered_df.groupby('type').agg(
    total_posts=('post_text', 'count'),
    unique_post_days=('date', 'nunique'),
    average_posts_per_day=('date', lambda x: x.value_counts().mean()),
    average_word_count=('word_count', 'mean'),
    max_word_count=('word_count', 'max'),
    min_word_count=('word_count', 'min'),
    most_active_day=('date', lambda x: x.value_counts().idxmax()),
    most_active_day_count=('date', lambda x: x.value_counts().max())
).reset_index()

# === EXPORT TO MULTI-SHEET EXCEL ===
output_path = "/content/drive/MyDrive/LinkedInData/linkedin_combined_stats.xlsx"

with pd.ExcelWriter(output_path, engine='xlsxwriter') as writer:
    # DAILY STATS
    daily_stats.to_excel(writer, sheet_name='Daily_Stats', index=False)

    # WEEKLY STATS
    weekly_stats.to_excel(writer, sheet_name='Weekly_Stats', index=False)

    # DAY OF WEEK STATS
    day_of_week_stats.to_excel(writer, sheet_name='DayOfWeek_Stats', index=False)

    # OVERALL METRICS
    #overall_df = pd.DataFrame([overall_by_type])
    #overall_df.to_excel(writer, sheet_name='Overall_Stats', index=False)
    overall_by_type.to_excel(writer, sheet_name='Overall_Stats', index=False)

    # FULL ENHANCED POST DATA
    enhanced_columns = [
    "entity_id",
    "resharedUpdate_id",
    "post_text",
    "actor_description",
    "actor_name",
    "actor_profile",
    "actor_backendUrn",
    "share_url",
    "type",
    "numReactions",
    "numLikes",
    "numInterests",
    "numAppreciates",
    "numEntertains",
    "numEmpathys",
    "numPraises",
    "numComments",
    "numShares",
    "image_epoch",
    "image_epoch_interpolated",
    "interpolated_time",
    "date",
    "week",
    "year",
    "word_count",
    "day_of_week",
    "char_count",
    "emoji_count",
    "hashtag_count",
    "post_gap_seconds",
    "post_gap_minutes",
    "post_gap_hours"
    ]

    # Now write it in this column order:
    filtered_df.to_excel(writer, sheet_name='Enhanced_Post_Data', index=False, columns=enhanced_columns)

print("✅ Excel file created:", output_path)